Almost finished casting XML for DataFrames
This commit is contained in:
Executable
+4
@@ -0,0 +1,4 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
cd target
|
||||||
|
spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar
|
||||||
@@ -26,6 +26,13 @@ object Main {
|
|||||||
|
|
||||||
// get the users XML file
|
// get the users XML file
|
||||||
val users = df("users")
|
val users = df("users")
|
||||||
users.show()
|
// Show 20 entries from the user dataset
|
||||||
|
//users.show()
|
||||||
|
// Show types for the user dataset
|
||||||
|
users.printSchema()
|
||||||
|
|
||||||
|
// create new dataframe with only the reputation of the users
|
||||||
|
users.select("Reputation").distinct.show()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,28 +23,33 @@ object DataParser {
|
|||||||
// Define XML file locations and a string of attribute tags to retrieve
|
// Define XML file locations and a string of attribute tags to retrieve
|
||||||
// from each xml element.
|
// from each xml element.
|
||||||
val xmlInfos = Array(
|
val xmlInfos = Array(
|
||||||
|
/*
|
||||||
("badges", "../stackoverflow_dataset/badges.txt", "Id UserId Name Date"),
|
("badges", "../stackoverflow_dataset/badges.txt", "Id UserId Name Date"),
|
||||||
("comments", "../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"),
|
("comments", "../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"),
|
||||||
("posts", "../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"),
|
("posts", "../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"),
|
||||||
("postHistory", "../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"),
|
("postHistory", "../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"),
|
||||||
("postLinks", "../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"),
|
("postLinks", "../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"),
|
||||||
("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"),
|
*/
|
||||||
|
("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, StringType, StringType, StringType, StringType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
|
||||||
|
/*
|
||||||
("votes", "../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate")
|
("votes", "../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate")
|
||||||
|
*/
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store each file's DataFrame in an array of DataFrames.
|
// Store each file's DataFrame in an array of DataFrames.
|
||||||
val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3)))).toMap
|
val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3, x._4)))).toMap
|
||||||
|
|
||||||
return parsedData
|
return parsedData
|
||||||
}
|
}
|
||||||
|
|
||||||
private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = {
|
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : DataFrame = {
|
||||||
// Get the XML attributes used for generating the table columns
|
// Get the XML attributes used for generating the table columns
|
||||||
var schemaString = xmlInfo._2
|
var schemaString = xmlInfo._2
|
||||||
|
var schemaType = xmlInfo._3
|
||||||
// Generate schema using XML attribute string
|
// Generate schema using XML attribute string
|
||||||
var schema = GenerateSchemaFromString(schemaString)
|
var schema = GenerateSchemaFromString(schemaString, schemaType)
|
||||||
// Generate RDD of data from the XML file
|
// Generate RDD of data from the XML file
|
||||||
var rdd = ParseInput(xmlInfo._1, schemaString)
|
var rdd = ParseInput(xmlInfo._1, schemaString, schemaType)
|
||||||
// Convert RDD to DataFrame for easier processing
|
// Convert RDD to DataFrame for easier processing
|
||||||
var data = Main.sqlContext.createDataFrame(rdd, schema)
|
var data = Main.sqlContext.createDataFrame(rdd, schema)
|
||||||
|
|
||||||
@@ -55,9 +60,9 @@ object DataParser {
|
|||||||
/*
|
/*
|
||||||
* Generate a schema based on the string of XML attributes
|
* Generate a schema based on the string of XML attributes
|
||||||
*/
|
*/
|
||||||
private def GenerateSchemaFromString(schemaString: String) : StructType = {
|
private def GenerateSchemaFromString(schemaString: String, schemaType: Array[DataType]) : StructType = {
|
||||||
val fields = schemaString.split(" ")
|
var schemaPairs = schemaString.split(" ") zip schemaType
|
||||||
.map(fieldName => StructField(fieldName, StringType, nullable = true))
|
val fields = schemaPairs.map{case (fieldName: String, dataType: DataType) => StructField(fieldName, dataType, nullable = true)}
|
||||||
val schema = StructType(fields)
|
val schema = StructType(fields)
|
||||||
return schema
|
return schema
|
||||||
}
|
}
|
||||||
@@ -68,12 +73,12 @@ object DataParser {
|
|||||||
* inputFilepath: Filepath to XML file
|
* inputFilepath: Filepath to XML file
|
||||||
* schemaString: Space seperated attribute values
|
* schemaString: Space seperated attribute values
|
||||||
*/
|
*/
|
||||||
private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = {
|
private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[Row] = {
|
||||||
// Create spark text file object
|
// Create spark text file object
|
||||||
val inputFile = Main.sc.textFile(inputFilepath)
|
val inputFile = Main.sc.textFile(inputFilepath)
|
||||||
|
|
||||||
// Map the input file data to an RDD
|
// Map the input file data to an RDD
|
||||||
val Data = inputFile.map(line => ParsingFunc(line, schemaString))
|
val Data = inputFile.map(line => ParsingFunc(line, schemaString, schemaType))
|
||||||
return Data
|
return Data
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -83,14 +88,26 @@ object DataParser {
|
|||||||
* line: XML file line
|
* line: XML file line
|
||||||
* schemaString: Space seperated attribute values
|
* schemaString: Space seperated attribute values
|
||||||
*/
|
*/
|
||||||
private def ParsingFunc(line: String, schemaString: String) : Row = {
|
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = {
|
||||||
// Parse line of XML using Scala's built in XML library
|
// Parse line of XML using Scala's built in XML library
|
||||||
val xmlLine = scala.xml.XML.loadString(line)
|
val xmlLine = scala.xml.XML.loadString(line)
|
||||||
|
var schemaPairs = schemaString.split(" ") zip schemaType
|
||||||
// Create array of values with element for each attribute in schemaString
|
// Create array of values with element for each attribute in schemaString
|
||||||
var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName))
|
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
|
||||||
|
|
||||||
return Row.fromSeq(lineData)
|
return Row.fromSeq(lineData)
|
||||||
}
|
}
|
||||||
|
private def castToDType(attribute: String, dType: DataType) : Any = {
|
||||||
|
dType match {
|
||||||
|
case StringType => return attribute
|
||||||
|
case IntegerType => try {
|
||||||
|
return attribute.toInt
|
||||||
|
} catch {
|
||||||
|
case e: Exception => return -1
|
||||||
|
}
|
||||||
|
case DateType => return attribute
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Handle NullPointerError raised when an attribute doesn't exist
|
* Handle NullPointerError raised when an attribute doesn't exist
|
||||||
|
|||||||
Reference in New Issue
Block a user