Fixed xml tag bug
This commit is contained in:
@@ -22,10 +22,10 @@ object Main {
|
||||
def main(args: Array[String]) {
|
||||
// Retrieve data from StackOverflow dataset XMLs. Format into DataFrames
|
||||
// for easy access to data elements.
|
||||
val df = DataParser.ParseData()
|
||||
val dataFrames = DataParser.ParseData()
|
||||
|
||||
// get the users XML file
|
||||
val users = df("users")
|
||||
val users = dataFrames("users")
|
||||
users.persist()
|
||||
// Show 20 entries from the user dataset
|
||||
users.show()
|
||||
@@ -34,11 +34,9 @@ object Main {
|
||||
users.show()
|
||||
|
||||
// create new dataframe with only the reputation of the users
|
||||
users.select("CreationDate").distinct.show()
|
||||
users.select("CreationDate").show()
|
||||
|
||||
// Info on using DataFrames here: https://www.mapr.com/blog/using-apache-spark-dataframes-processing-tabular-data
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -78,7 +78,9 @@ object DataParser {
|
||||
val inputFile = Main.sc.textFile(inputFilepath)
|
||||
|
||||
// Map the input file data to an RDD
|
||||
val Data = inputFile.map(line => ParsingFunc(line, schemaString, schemaType))
|
||||
val Data = inputFile.collect {
|
||||
case line if !SantizeLine(line) => ParsingFunc(line, schemaString, schemaType)
|
||||
}
|
||||
return Data
|
||||
}
|
||||
|
||||
@@ -88,14 +90,26 @@ object DataParser {
|
||||
* line: XML file line
|
||||
* schemaString: Space seperated attribute values
|
||||
*/
|
||||
|
||||
private def SantizeLine(line: String) : Boolean = {
|
||||
val invalidLines = Array("<?xml version=\"1.0\" encoding=\"utf-8\"?>", "<users>", "</users>")
|
||||
return invalidLines contains line
|
||||
}
|
||||
|
||||
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = {
|
||||
// Parse line of XML using Scala's built in XML library
|
||||
val xmlLine = scala.xml.XML.loadString(line)
|
||||
var schemaPairs = schemaString.split(" ") zip schemaType
|
||||
// Create array of values with element for each attribute in schemaString
|
||||
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
|
||||
try {
|
||||
val xmlLine = scala.xml.XML.loadString(line)
|
||||
var schemaPairs = schemaString.split(" ") zip schemaType
|
||||
// Create array of values with element for each attribute in schemaString
|
||||
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
|
||||
|
||||
return Row.fromSeq(lineData)
|
||||
return Row.fromSeq(lineData)
|
||||
} catch {
|
||||
case e:Exception=>
|
||||
println(line)
|
||||
throw new Exception("failed to load")
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user