|
|
|
@@ -1,5 +1,6 @@
|
|
|
|
|
package ClusterSOData
|
|
|
|
|
|
|
|
|
|
import scala.collection.mutable.ListBuffer
|
|
|
|
|
import org.apache.spark.SparkContext
|
|
|
|
|
import org.apache.spark.SparkContext._
|
|
|
|
|
import org.apache.spark._
|
|
|
|
@@ -9,16 +10,36 @@ import org.apache.spark.rdd.RDD
|
|
|
|
|
import org.apache.spark.sql._
|
|
|
|
|
import org.apache.spark.sql.types._
|
|
|
|
|
|
|
|
|
|
import scala.xml.Elem
|
|
|
|
|
import scala.xml.factory.XMLLoader
|
|
|
|
|
import javax.xml.parsers.SAXParser
|
|
|
|
|
|
|
|
|
|
object MyXML extends XMLLoader[Elem] {
|
|
|
|
|
override def parser: SAXParser = {
|
|
|
|
|
val f = javax.xml.parsers.SAXParserFactory.newInstance()
|
|
|
|
|
f.setNamespaceAware(false)
|
|
|
|
|
f.setFeature("http://xml.org/sax/features/validation", false)
|
|
|
|
|
f.setFeature("http://apache.org/xml/features/nonvalidating/load-dtd-grammar", false)
|
|
|
|
|
f.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
|
|
|
|
|
f.setFeature("http://xml.org/sax/features/external-general-entities", false)
|
|
|
|
|
f.setFeature("http://xml.org/sax/features/external-parameter-entities", false)
|
|
|
|
|
f.setValidating(false)
|
|
|
|
|
f.newSAXParser()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Format and parse XML data to datasets, ready for further processing using
|
|
|
|
|
* spark
|
|
|
|
|
*/
|
|
|
|
|
object DataParser {
|
|
|
|
|
val xParse = scala.xml.XML
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Generate array of DataFrames from XML content
|
|
|
|
|
*/
|
|
|
|
|
def ParseData() : Map[String, DataFrame] = {
|
|
|
|
|
def ParseData() : Map[String, RDD[List[String]]] = {
|
|
|
|
|
|
|
|
|
|
// Define XML file locations and a string of attribute tags to retrieve
|
|
|
|
|
// from each xml element.
|
|
|
|
@@ -30,7 +51,7 @@ object DataParser {
|
|
|
|
|
("postHistory", "/data/stackoverflow/PostHistory","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId", Array[DataType](IntegerType, IntegerType, IntegerType,IntegerType, DateType, IntegerType, StringType, StringType, StringType, IntegerType)),
|
|
|
|
|
("postLinks", "data/stackoverflow/PostLinks", "Id CreationDate PostId RelatedPostId PostLinkTypeId", Array[DataType](IntegerType, DateType, IntegerType, IntegerType, IntegerType)),
|
|
|
|
|
*/
|
|
|
|
|
("users", "/data/stackoverflow/Users", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
|
|
|
|
|
("users", "../stackoverflow_dataset/users2.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
|
|
|
|
|
/*
|
|
|
|
|
("votes", "/data/stackoverflow/Votes", "Id PostId VoteTypeId UserId CreationDate", Array[DataType](IntegerType, IntegerType, IntegerType, IntegerType, DateType))
|
|
|
|
|
*/
|
|
|
|
@@ -42,7 +63,7 @@ object DataParser {
|
|
|
|
|
return parsedData
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : DataFrame = {
|
|
|
|
|
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : RDD[List[String]] = {
|
|
|
|
|
// Get the XML attributes used for generating the table columns
|
|
|
|
|
var schemaString = xmlInfo._2
|
|
|
|
|
val schemaType = xmlInfo._3
|
|
|
|
@@ -51,9 +72,10 @@ object DataParser {
|
|
|
|
|
// Generate RDD of data from the XML file
|
|
|
|
|
var rdd = ParseInput(xmlInfo._1, schemaString, schemaType)
|
|
|
|
|
// Convert RDD to DataFrame for easier processing
|
|
|
|
|
var data = Main.sqlContext.createDataFrame(rdd, schema)
|
|
|
|
|
//var data = Main.sqlContext.createDataFrame(rdd, schema)
|
|
|
|
|
rdd.persist()
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
return rdd
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -77,13 +99,13 @@ object DataParser {
|
|
|
|
|
* inputFilepath: Filepath to XML file
|
|
|
|
|
* schemaString: Space seperated attribute values
|
|
|
|
|
*/
|
|
|
|
|
private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[Row] = {
|
|
|
|
|
private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[List[String]] = {
|
|
|
|
|
// Create spark text file object
|
|
|
|
|
val inputFile = Main.sc.textFile(inputFilepath)
|
|
|
|
|
|
|
|
|
|
// Map the input file data to an RDD
|
|
|
|
|
val Data = inputFile.collect {
|
|
|
|
|
case line if !SantizeLine(line) => ParsingFunc(line, schemaString, schemaType)
|
|
|
|
|
case line if true => ParsingFunc(line, schemaString, schemaType)
|
|
|
|
|
}
|
|
|
|
|
return Data
|
|
|
|
|
}
|
|
|
|
@@ -95,19 +117,48 @@ object DataParser {
|
|
|
|
|
* schemaString: Space seperated attribute values
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
private def SantizeLine(line: String) : Boolean = {
|
|
|
|
|
val invalidLines = Array("<?xml version=\"1.0\" encoding=\"utf-8\"?>", "<users>", "</users>")
|
|
|
|
|
return invalidLines contains line
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = {
|
|
|
|
|
private def getFeatures(line :String) : List[String] = {
|
|
|
|
|
|
|
|
|
|
val fragments = line.split("\"")
|
|
|
|
|
|
|
|
|
|
val featureIDs = List(" Reputation="," CreationDate="," LastAccessDate="," Views="," UpVotes="," DownVotes="," Age=")
|
|
|
|
|
|
|
|
|
|
var features = new ListBuffer[String]()
|
|
|
|
|
features += fragments(1)
|
|
|
|
|
|
|
|
|
|
var a = ""
|
|
|
|
|
for (a <- featureIDs) {
|
|
|
|
|
if (fragments.contains(a)) {
|
|
|
|
|
val index = fragments.indexOf(a)
|
|
|
|
|
features += fragments(index + 1)
|
|
|
|
|
} else {
|
|
|
|
|
features += ""
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
features(2) = features(2).substring(0,10)
|
|
|
|
|
features(3) = features(3).substring(0,10)
|
|
|
|
|
|
|
|
|
|
val featuresList = features.toList
|
|
|
|
|
return featuresList
|
|
|
|
|
}
|
|
|
|
|
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : List[String] = {
|
|
|
|
|
// Parse line of XML using Scala's built in XML library
|
|
|
|
|
val xmlLine = scala.xml.XML.loadString(line)
|
|
|
|
|
var schemaPairs = schemaString.split(" ") zip schemaType
|
|
|
|
|
val blargh = xParse.loadString(line)
|
|
|
|
|
val xmlLine = getFeatures(line)
|
|
|
|
|
//var schemaPairs = schemaString.split(" ") zip schemaType
|
|
|
|
|
// Create array of values with element for each attribute in schemaString
|
|
|
|
|
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
|
|
|
|
|
//var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
|
|
|
|
|
|
|
|
|
|
return Row.fromSeq(lineData)
|
|
|
|
|
//return lineData
|
|
|
|
|
return xmlLine
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|