diff --git a/.gitignore b/.gitignore index 2934343..4dd33cb 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ bin/ external/ CMakeFiles/ + +mlib.scala diff --git a/.gittrack b/.gittrack index e69de29..afcdcd1 100644 --- a/.gittrack +++ b/.gittrack @@ -0,0 +1 @@ +*.scala diff --git a/pom.xml b/pom.xml index 2e864f4..2de7cfc 100644 --- a/pom.xml +++ b/pom.xml @@ -88,5 +88,10 @@ spark-core_2.11 2.0.2 + + org.apache.spark + spark-sql_2.11 + 2.0.2 + diff --git a/src/main/scala/KMeans.scala b/src/main/scala/KMeans.scala index c77460b..5fbc8ef 100644 --- a/src/main/scala/KMeans.scala +++ b/src/main/scala/KMeans.scala @@ -1,15 +1,18 @@ +package ClusterOSData + import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object KMeans { - /* This is my first java program. - * This will print 'Hello World' as the output + /** + * Run KMeans clustering on an input RDD vector */ - def main(args: Array[String]) { - val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) - val inputfile = sc.textFile("../stackoverflow_dataset/badges.txt") - val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); - counts.saveAsTextFile("output") - } + def run( + //data: RDD[Vector] + ) + { + // val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); + // counts.saveAsTextFile("output") + } } diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala new file mode 100644 index 0000000..7d07b71 --- /dev/null +++ b/src/main/scala/Main.scala @@ -0,0 +1,81 @@ +package ClusterOSData +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SQLContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +object Main { + val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) + val sqlContext= new org.apache.spark.sql.SQLContext(sc) + import sqlContext.implicits._ + + def main(args: Array[String]) { + KMeans.run() + DataParser.ParseData() + } + + object DataParser { + + // case class BadgeRow(i: String, j: String, k: String, m: String) + // case class VotesRow(i: String, j: String, k: String, m: String, m: String) + // case class CommentsRow(i: String, j: String, k: String, m: String) + //case class BadgeRow(i: String, j: String, k: String, m: String) + //case class BadgeRow(i: String, j: String, k: String, m: String) + + def ParseData() { + // The schema is encoded in a string + var schemaString = "Id UserId Name Date" + + // Generate the schema based on the string of schema + var fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + var schema = StructType(fields) + var rdd = ParseInput("../stackoverflow_dataset/badges.txt", schemaString) + val badgeData = sqlContext.createDataFrame(rdd, schema) + badgeData.show() + + // The schema is encoded in a string + schemaString = "Id PostId VoteTypeId UserId CreationDate" + + // Generate the schema based on the string of schema + fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + schema = StructType(fields) + rdd = ParseInput("../stackoverflow_dataset/votes.txt", schemaString) + val voteData = sqlContext.createDataFrame(rdd, schema) + voteData.show() + // ParseInput("../stackoverflow_dataset/comments.txt") + // ParseInput("../stackoverflow_dataset/postHistory.txt") + // ParseInput("../stackoverflow_dataset/postLinks.txt") + } + + private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { + val inputFile = Main.sc.textFile(inputFilepath) + + val Data = inputFile.map(line => ParsingFunc(line, schemaString)) + return Data + } + + private def ParsingFunc(line: String, schemaString: String) : Row = { + val xmlLine = scala.xml.XML.loadString(line) + var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) + + println(lineData) + return Row.fromSeq(lineData) + } + + private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = { + try { + return xmlLine.attributes(attribute).text + } catch { + case npe: NullPointerException => return "" + } + } + } +} + +