From 32e774819e091040cc083856baca04c7c4affe13 Mon Sep 17 00:00:00 2001 From: Sam Perry Date: Thu, 15 Dec 2016 13:07:18 +0000 Subject: [PATCH] Almost finished casting XML for DataFrames --- run_project.sh | 4 ++++ src/main/scala/Main.scala | 9 +++++++- src/main/scala/XMLParser.scala | 41 ++++++++++++++++++++++++---------- 3 files changed, 41 insertions(+), 13 deletions(-) create mode 100755 run_project.sh diff --git a/run_project.sh b/run_project.sh new file mode 100755 index 0000000..4f9f84d --- /dev/null +++ b/run_project.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +cd target +spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index 8915c3e..5f80e96 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -26,6 +26,13 @@ object Main { // get the users XML file val users = df("users") - users.show() + // Show 20 entries from the user dataset + //users.show() + // Show types for the user dataset + users.printSchema() + + // create new dataframe with only the reputation of the users + users.select("Reputation").distinct.show() } } + diff --git a/src/main/scala/XMLParser.scala b/src/main/scala/XMLParser.scala index 11c23ab..f5ecd13 100644 --- a/src/main/scala/XMLParser.scala +++ b/src/main/scala/XMLParser.scala @@ -23,28 +23,33 @@ object DataParser { // Define XML file locations and a string of attribute tags to retrieve // from each xml element. val xmlInfos = Array( + /* ("badges", "../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), ("comments", "../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), ("posts", "../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), ("postHistory", "../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), ("postLinks", "../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), - ("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), + */ + ("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, StringType, StringType, StringType, StringType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType)) + /* ("votes", "../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") + */ ) // Store each file's DataFrame in an array of DataFrames. - val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3)))).toMap + val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3, x._4)))).toMap return parsedData } - private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = { + private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : DataFrame = { // Get the XML attributes used for generating the table columns var schemaString = xmlInfo._2 + var schemaType = xmlInfo._3 // Generate schema using XML attribute string - var schema = GenerateSchemaFromString(schemaString) + var schema = GenerateSchemaFromString(schemaString, schemaType) // Generate RDD of data from the XML file - var rdd = ParseInput(xmlInfo._1, schemaString) + var rdd = ParseInput(xmlInfo._1, schemaString, schemaType) // Convert RDD to DataFrame for easier processing var data = Main.sqlContext.createDataFrame(rdd, schema) @@ -55,9 +60,9 @@ object DataParser { /* * Generate a schema based on the string of XML attributes */ - private def GenerateSchemaFromString(schemaString: String) : StructType = { - val fields = schemaString.split(" ") - .map(fieldName => StructField(fieldName, StringType, nullable = true)) + private def GenerateSchemaFromString(schemaString: String, schemaType: Array[DataType]) : StructType = { + var schemaPairs = schemaString.split(" ") zip schemaType + val fields = schemaPairs.map{case (fieldName: String, dataType: DataType) => StructField(fieldName, dataType, nullable = true)} val schema = StructType(fields) return schema } @@ -68,12 +73,12 @@ object DataParser { * inputFilepath: Filepath to XML file * schemaString: Space seperated attribute values */ - private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { + private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[Row] = { // Create spark text file object val inputFile = Main.sc.textFile(inputFilepath) // Map the input file data to an RDD - val Data = inputFile.map(line => ParsingFunc(line, schemaString)) + val Data = inputFile.map(line => ParsingFunc(line, schemaString, schemaType)) return Data } @@ -83,14 +88,26 @@ object DataParser { * line: XML file line * schemaString: Space seperated attribute values */ - private def ParsingFunc(line: String, schemaString: String) : Row = { + private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = { // Parse line of XML using Scala's built in XML library val xmlLine = scala.xml.XML.loadString(line) + var schemaPairs = schemaString.split(" ") zip schemaType // Create array of values with element for each attribute in schemaString - var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) + var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) } return Row.fromSeq(lineData) } + private def castToDType(attribute: String, dType: DataType) : Any = { + dType match { + case StringType => return attribute + case IntegerType => try { + return attribute.toInt + } catch { + case e: Exception => return -1 + } + case DateType => return attribute + } + } /* * Handle NullPointerError raised when an attribute doesn't exist