From e824e87c573c5c67e973e3ddb6877c5d8e5020d1 Mon Sep 17 00:00:00 2001 From: Sam Perry Date: Wed, 30 Nov 2016 22:36:39 +0000 Subject: [PATCH 1/5] Created basic word count spark project Set versions of pom.xml for correct compilation using mvn --- Notes.mkdn | 80 ++++++++++++++++++++++++++++++++ pom.xml | 92 +++++++++++++++++++++++++++++++++++++ src/main/scala/KMeans.scala | 15 ++++++ 3 files changed, 187 insertions(+) create mode 100644 Notes.mkdn create mode 100644 pom.xml create mode 100644 src/main/scala/KMeans.scala diff --git a/Notes.mkdn b/Notes.mkdn new file mode 100644 index 0000000..fc8ae60 --- /dev/null +++ b/Notes.mkdn @@ -0,0 +1,80 @@ +Building and Running the Project +================================ + +These instructions will work using the files at this commit: + + git checkout 800b1f59edaa20a9b65f32a815605307e1102baa + +First, you need to download the small sample of the stack overflow data that +can be found here: + +https://drive.google.com/open?id=0B0uip08Km2LPVTFTRFhrdHF2WW8 + +Put it in a directory at the project's root called ./stackoverflow_dataset + +Next, the following programs need to be installed on your system (homebrew was +used for easy installation on OSX) + +Spark: + + brew install apache-spark + +Scala: + + brew install scala + +Maven: + + brew install maven + +To build and run the project locally you need to set versions in the pom.xml +file to match those of the programs installed on your system. +the following lines need to be updated in the pom.xml file: + + + + org.scala-lang + scala-library + 2.11.8 <<<< + + + org.apache.spark + spark-core_2.11 <<<< + 2.0.2 <<<< + + + +running: + + spark-shell + +should give you output that will tell you your versions similar to this: + + Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version 2.0.2 + /_/ + + Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92) + Type in expressions to have them evaluated. + Type :help for more information. + +Having edited this pom.xml file, run the following from the root of the +project to compile: + + mvn clean package + +This should run successfully (and will probably download and install a whole bunch of +stuff the first time you run it...) + +To run the compiled application: + + cd target + spark-submit --class KMeans --master local KMeans-0.0.1.jar + +That should run without errors, producing an output folder. Check that +something has been generated by running: + + cat output/part-00000 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2e864f4 --- /dev/null +++ b/pom.xml @@ -0,0 +1,92 @@ + + + + 4.0.0 + uk.ac.qmul.spark + KMeans + 0.0.1 + jar + "Spark KMeans Clustering" + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + maven-hadoop + Hadoop Releases + https://repository.cloudera.com/content/repositories/releases/ + + + cloudera-repos + Cloudera Repos + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + + UTF-8 + UTF-8 + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + + + + + + maven-compiler-plugin + 3.1 + + 1.6 + 1.6 + + + + + + + + org.scala-lang + scala-library + 2.11.8 + + + org.apache.spark + spark-core_2.11 + 2.0.2 + + + diff --git a/src/main/scala/KMeans.scala b/src/main/scala/KMeans.scala new file mode 100644 index 0000000..c77460b --- /dev/null +++ b/src/main/scala/KMeans.scala @@ -0,0 +1,15 @@ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ + +object KMeans { + /* This is my first java program. + * This will print 'Hello World' as the output + */ + def main(args: Array[String]) { + val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) + val inputfile = sc.textFile("../stackoverflow_dataset/badges.txt") + val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); + counts.saveAsTextFile("output") + } +} From 7abdd89631d2c42b133ea5d9c0248272ab411c5a Mon Sep 17 00:00:00 2001 From: Sam Perry Date: Sat, 3 Dec 2016 00:09:18 +0000 Subject: [PATCH 2/5] Finished basic xml parser for Badges.txt Begun implementation for all txt files. Added .scala files to tracking git hook Added missing main.scala file. --- .gitignore | 2 + .gittrack | 1 + pom.xml | 5 +++ src/main/scala/KMeans.scala | 19 +++++---- src/main/scala/Main.scala | 81 +++++++++++++++++++++++++++++++++++++ 5 files changed, 100 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/Main.scala diff --git a/.gitignore b/.gitignore index 2934343..4dd33cb 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ bin/ external/ CMakeFiles/ + +mlib.scala diff --git a/.gittrack b/.gittrack index e69de29..afcdcd1 100644 --- a/.gittrack +++ b/.gittrack @@ -0,0 +1 @@ +*.scala diff --git a/pom.xml b/pom.xml index 2e864f4..2de7cfc 100644 --- a/pom.xml +++ b/pom.xml @@ -88,5 +88,10 @@ spark-core_2.11 2.0.2 + + org.apache.spark + spark-sql_2.11 + 2.0.2 + diff --git a/src/main/scala/KMeans.scala b/src/main/scala/KMeans.scala index c77460b..5fbc8ef 100644 --- a/src/main/scala/KMeans.scala +++ b/src/main/scala/KMeans.scala @@ -1,15 +1,18 @@ +package ClusterOSData + import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object KMeans { - /* This is my first java program. - * This will print 'Hello World' as the output + /** + * Run KMeans clustering on an input RDD vector */ - def main(args: Array[String]) { - val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) - val inputfile = sc.textFile("../stackoverflow_dataset/badges.txt") - val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); - counts.saveAsTextFile("output") - } + def run( + //data: RDD[Vector] + ) + { + // val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); + // counts.saveAsTextFile("output") + } } diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala new file mode 100644 index 0000000..7d07b71 --- /dev/null +++ b/src/main/scala/Main.scala @@ -0,0 +1,81 @@ +package ClusterOSData +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SQLContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +object Main { + val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) + val sqlContext= new org.apache.spark.sql.SQLContext(sc) + import sqlContext.implicits._ + + def main(args: Array[String]) { + KMeans.run() + DataParser.ParseData() + } + + object DataParser { + + // case class BadgeRow(i: String, j: String, k: String, m: String) + // case class VotesRow(i: String, j: String, k: String, m: String, m: String) + // case class CommentsRow(i: String, j: String, k: String, m: String) + //case class BadgeRow(i: String, j: String, k: String, m: String) + //case class BadgeRow(i: String, j: String, k: String, m: String) + + def ParseData() { + // The schema is encoded in a string + var schemaString = "Id UserId Name Date" + + // Generate the schema based on the string of schema + var fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + var schema = StructType(fields) + var rdd = ParseInput("../stackoverflow_dataset/badges.txt", schemaString) + val badgeData = sqlContext.createDataFrame(rdd, schema) + badgeData.show() + + // The schema is encoded in a string + schemaString = "Id PostId VoteTypeId UserId CreationDate" + + // Generate the schema based on the string of schema + fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + schema = StructType(fields) + rdd = ParseInput("../stackoverflow_dataset/votes.txt", schemaString) + val voteData = sqlContext.createDataFrame(rdd, schema) + voteData.show() + // ParseInput("../stackoverflow_dataset/comments.txt") + // ParseInput("../stackoverflow_dataset/postHistory.txt") + // ParseInput("../stackoverflow_dataset/postLinks.txt") + } + + private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { + val inputFile = Main.sc.textFile(inputFilepath) + + val Data = inputFile.map(line => ParsingFunc(line, schemaString)) + return Data + } + + private def ParsingFunc(line: String, schemaString: String) : Row = { + val xmlLine = scala.xml.XML.loadString(line) + var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) + + println(lineData) + return Row.fromSeq(lineData) + } + + private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = { + try { + return xmlLine.attributes(attribute).text + } catch { + case npe: NullPointerException => return "" + } + } + } +} + + From effdd775208ba923b1fffb029f3105d8fae704e7 Mon Sep 17 00:00:00 2001 From: Sam Perry Date: Sat, 3 Dec 2016 13:45:08 +0000 Subject: [PATCH 3/5] Finished implementing XML parser object --- src/main/scala/Main.scala | 63 +++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index 7d07b71..26407ce 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -15,42 +15,47 @@ object Main { def main(args: Array[String]) { KMeans.run() - DataParser.ParseData() + val df = DataParser.ParseData(sc, sqlContext) } object DataParser { + def ParseData(sparkc: SparkContext, sqlc: SQLContext) : Array[DataFrame] = { - // case class BadgeRow(i: String, j: String, k: String, m: String) - // case class VotesRow(i: String, j: String, k: String, m: String, m: String) - // case class CommentsRow(i: String, j: String, k: String, m: String) - //case class BadgeRow(i: String, j: String, k: String, m: String) - //case class BadgeRow(i: String, j: String, k: String, m: String) - - def ParseData() { - // The schema is encoded in a string - var schemaString = "Id UserId Name Date" - - // Generate the schema based on the string of schema - var fields = schemaString.split(" ") - .map(fieldName => StructField(fieldName, StringType, nullable = true)) - var schema = StructType(fields) - var rdd = ParseInput("../stackoverflow_dataset/badges.txt", schemaString) - val badgeData = sqlContext.createDataFrame(rdd, schema) - badgeData.show() + val xmlInfos = Array( + ("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), + ("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), + ("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), + ("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), + ("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), + ("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), + ("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") + ) - // The schema is encoded in a string - schemaString = "Id PostId VoteTypeId UserId CreationDate" + + val parsedData = xmlInfos.map(x => ParseXMLInfo(x)) + for(i <- 0 until parsedData.length){ + parsedData(i).show() + } + return parsedData + } + + private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = { + // The schema is encoded in a string + var schemaString = xmlInfo._2 + var schema = GenerateSchemaFromString(schemaString) + var rdd = ParseInput(xmlInfo._1, schemaString) + var data = sqlContext.createDataFrame(rdd, schema) + return data + + } + + private def GenerateSchemaFromString(schemaString: String) : StructType = { // Generate the schema based on the string of schema - fields = schemaString.split(" ") + val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) - schema = StructType(fields) - rdd = ParseInput("../stackoverflow_dataset/votes.txt", schemaString) - val voteData = sqlContext.createDataFrame(rdd, schema) - voteData.show() - // ParseInput("../stackoverflow_dataset/comments.txt") - // ParseInput("../stackoverflow_dataset/postHistory.txt") - // ParseInput("../stackoverflow_dataset/postLinks.txt") + val schema = StructType(fields) + return schema } private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { @@ -64,7 +69,6 @@ object Main { val xmlLine = scala.xml.XML.loadString(line) var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) - println(lineData) return Row.fromSeq(lineData) } @@ -79,3 +83,4 @@ object Main { } + From f491d78c413450c8330bfad03a38b7a70c9175c2 Mon Sep 17 00:00:00 2001 From: Sam Perry Date: Sat, 3 Dec 2016 15:56:13 +0000 Subject: [PATCH 4/5] Restructured and commented project --- src/main/scala/KMeans.scala | 2 +- src/main/scala/Main.scala | 79 ++++------------------- src/main/scala/XMLParser.scala | 112 +++++++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+), 69 deletions(-) create mode 100644 src/main/scala/XMLParser.scala diff --git a/src/main/scala/KMeans.scala b/src/main/scala/KMeans.scala index 5fbc8ef..7da3587 100644 --- a/src/main/scala/KMeans.scala +++ b/src/main/scala/KMeans.scala @@ -1,4 +1,4 @@ -package ClusterOSData +package ClusterSOData import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index 26407ce..9149bd0 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -1,4 +1,4 @@ -package ClusterOSData +package ClusterSOData import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ @@ -8,79 +8,22 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.types._ +/* + * Run KMeans clustering on the StackOverflow dataset + */ object Main { + // Initialize spark and SQL to allow for processing of structured data in a + // spark cluster val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) val sqlContext= new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ + // Main function for task execution def main(args: Array[String]) { + // Retrieve data from StackOverflow dataset XMLs. Format into DataFrames + // for easy access to data elements. + val df = DataParser.ParseData() + KMeans.run() - val df = DataParser.ParseData(sc, sqlContext) - } - - object DataParser { - def ParseData(sparkc: SparkContext, sqlc: SQLContext) : Array[DataFrame] = { - - val xmlInfos = Array( - ("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), - ("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), - ("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), - ("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), - ("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), - ("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), - ("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") - ) - - - val parsedData = xmlInfos.map(x => ParseXMLInfo(x)) - - for(i <- 0 until parsedData.length){ - parsedData(i).show() - } - return parsedData - } - - private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = { - // The schema is encoded in a string - var schemaString = xmlInfo._2 - var schema = GenerateSchemaFromString(schemaString) - var rdd = ParseInput(xmlInfo._1, schemaString) - var data = sqlContext.createDataFrame(rdd, schema) - return data - - } - - private def GenerateSchemaFromString(schemaString: String) : StructType = { - // Generate the schema based on the string of schema - val fields = schemaString.split(" ") - .map(fieldName => StructField(fieldName, StringType, nullable = true)) - val schema = StructType(fields) - return schema - } - - private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { - val inputFile = Main.sc.textFile(inputFilepath) - - val Data = inputFile.map(line => ParsingFunc(line, schemaString)) - return Data - } - - private def ParsingFunc(line: String, schemaString: String) : Row = { - val xmlLine = scala.xml.XML.loadString(line) - var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) - - return Row.fromSeq(lineData) - } - - private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = { - try { - return xmlLine.attributes(attribute).text - } catch { - case npe: NullPointerException => return "" - } - } } } - - - diff --git a/src/main/scala/XMLParser.scala b/src/main/scala/XMLParser.scala new file mode 100644 index 0000000..f386ebd --- /dev/null +++ b/src/main/scala/XMLParser.scala @@ -0,0 +1,112 @@ +package ClusterSOData + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SQLContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/* + * Format and parse XML data to datasets, ready for further processing using + * spark + */ +object DataParser { + + /* + * Generate array of DataFrames from XML content + */ + def ParseData() : Array[DataFrame] = { + + // Define XML file locations and a string of attribute tags to retrieve + // from each xml element. + val xmlInfos = Array( + ("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), + ("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), + ("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), + ("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), + ("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), + ("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), + ("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") + ) + + // Store each file's DataFrame in an array of DataFrames. + val parsedData = xmlInfos.map(x => ParseXMLInfo(x)) + + // Display a subset of each DataFrame's data in a table + for(i <- 0 until parsedData.length){ + parsedData(i).show() + } + + return parsedData + } + + private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = { + // Get the XML attributes used for generating the table columns + var schemaString = xmlInfo._2 + // Generate schema using XML attribute string + var schema = GenerateSchemaFromString(schemaString) + // Generate RDD of data from the XML file + var rdd = ParseInput(xmlInfo._1, schemaString) + // Convert RDD to DataFrame for easier processing + var data = Main.sqlContext.createDataFrame(rdd, schema) + + return data + + } + + /* + * Generate a schema based on the string of XML attributes + */ + private def GenerateSchemaFromString(schemaString: String) : StructType = { + val fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + val schema = StructType(fields) + return schema + } + + /* + * Create RDD from XML file + * + * inputFilepath: Filepath to XML file + * schemaString: Space seperated attribute values + */ + private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { + // Create spark text file object + val inputFile = Main.sc.textFile(inputFilepath) + + // Map the input file data to an RDD + val Data = inputFile.map(line => ParsingFunc(line, schemaString)) + return Data + } + + /* + * Retrieve XML attributes from a String + * + * line: XML file line + * schemaString: Space seperated attribute values + */ + private def ParsingFunc(line: String, schemaString: String) : Row = { + // Parse line of XML using Scala's built in XML library + val xmlLine = scala.xml.XML.loadString(line) + // Create array of values with element for each attribute in schemaString + var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) + + return Row.fromSeq(lineData) + } + + /* + * Handle NullPointerError raised when an attribute doesn't exist + * + * Return an empty string if the attribute doesn't exist + */ + private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = { + try { + return xmlLine.attributes(attribute).text + } catch { + case npe: NullPointerException => return "" + } + } +} From b41d085184f47e0b27f1e93597d2e9c8427d9621 Mon Sep 17 00:00:00 2001 From: Sam Perry Date: Wed, 14 Dec 2016 14:41:25 +0000 Subject: [PATCH 5/5] Pre-master merge commit --- Notes.mkdn | 2 +- src/main/scala/KMeans.scala | 9 +++++---- src/main/scala/Main.scala | 4 +++- src/main/scala/XMLParser.scala | 23 +++++++++-------------- 4 files changed, 18 insertions(+), 20 deletions(-) diff --git a/Notes.mkdn b/Notes.mkdn index fc8ae60..54cfe00 100644 --- a/Notes.mkdn +++ b/Notes.mkdn @@ -72,7 +72,7 @@ stuff the first time you run it...) To run the compiled application: cd target - spark-submit --class KMeans --master local KMeans-0.0.1.jar + spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar That should run without errors, producing an output folder. Check that something has been generated by running: diff --git a/src/main/scala/KMeans.scala b/src/main/scala/KMeans.scala index 7da3587..a603e73 100644 --- a/src/main/scala/KMeans.scala +++ b/src/main/scala/KMeans.scala @@ -3,16 +3,17 @@ package ClusterSOData import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ +import org.apache.spark.sql._ object KMeans { /** * Run KMeans clustering on an input RDD vector */ - def run( - //data: RDD[Vector] + def train( + //data: DataSet ) { - // val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); - // counts.saveAsTextFile("output") + //Normalise data using Euclidean normalisation + } } diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index 9149bd0..8915c3e 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -24,6 +24,8 @@ object Main { // for easy access to data elements. val df = DataParser.ParseData() - KMeans.run() + // get the users XML file + val users = df("users") + users.show() } } diff --git a/src/main/scala/XMLParser.scala b/src/main/scala/XMLParser.scala index f386ebd..11c23ab 100644 --- a/src/main/scala/XMLParser.scala +++ b/src/main/scala/XMLParser.scala @@ -18,27 +18,22 @@ object DataParser { /* * Generate array of DataFrames from XML content */ - def ParseData() : Array[DataFrame] = { + def ParseData() : Map[String, DataFrame] = { // Define XML file locations and a string of attribute tags to retrieve // from each xml element. val xmlInfos = Array( - ("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), - ("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), - ("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), - ("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), - ("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), - ("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), - ("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") + ("badges", "../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), + ("comments", "../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), + ("posts", "../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), + ("postHistory", "../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), + ("postLinks", "../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), + ("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), + ("votes", "../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") ) // Store each file's DataFrame in an array of DataFrames. - val parsedData = xmlInfos.map(x => ParseXMLInfo(x)) - - // Display a subset of each DataFrame's data in a table - for(i <- 0 until parsedData.length){ - parsedData(i).show() - } + val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3)))).toMap return parsedData }