Finished basic xml parser for Badges.txt
Begun implementation for all txt files. Added .scala files to tracking git hook Added missing main.scala file.
This commit is contained in:
@@ -35,3 +35,5 @@ bin/
|
|||||||
|
|
||||||
external/
|
external/
|
||||||
CMakeFiles/
|
CMakeFiles/
|
||||||
|
|
||||||
|
mlib.scala
|
||||||
|
|||||||
@@ -88,5 +88,10 @@
|
|||||||
<artifactId>spark-core_2.11</artifactId>
|
<artifactId>spark-core_2.11</artifactId>
|
||||||
<version>2.0.2</version>
|
<version>2.0.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-sql_2.11</artifactId>
|
||||||
|
<version>2.0.2</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@@ -1,15 +1,18 @@
|
|||||||
|
package ClusterOSData
|
||||||
|
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.SparkContext._
|
import org.apache.spark.SparkContext._
|
||||||
import org.apache.spark._
|
import org.apache.spark._
|
||||||
|
|
||||||
object KMeans {
|
object KMeans {
|
||||||
/* This is my first java program.
|
/**
|
||||||
* This will print 'Hello World' as the output
|
* Run KMeans clustering on an input RDD vector
|
||||||
*/
|
*/
|
||||||
def main(args: Array[String]) {
|
def run(
|
||||||
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering"))
|
//data: RDD[Vector]
|
||||||
val inputfile = sc.textFile("../stackoverflow_dataset/badges.txt")
|
)
|
||||||
val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
|
{
|
||||||
counts.saveAsTextFile("output")
|
// val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
|
||||||
}
|
// counts.saveAsTextFile("output")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package ClusterOSData
|
||||||
|
import org.apache.spark.SparkContext
|
||||||
|
import org.apache.spark.SparkContext._
|
||||||
|
import org.apache.spark._
|
||||||
|
import org.apache.spark.sql.SQLContext
|
||||||
|
import org.apache.spark.sql.SQLContext._
|
||||||
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql._
|
||||||
|
import org.apache.spark.sql.types._
|
||||||
|
|
||||||
|
object Main {
|
||||||
|
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering"))
|
||||||
|
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
|
||||||
|
import sqlContext.implicits._
|
||||||
|
|
||||||
|
def main(args: Array[String]) {
|
||||||
|
KMeans.run()
|
||||||
|
DataParser.ParseData()
|
||||||
|
}
|
||||||
|
|
||||||
|
object DataParser {
|
||||||
|
|
||||||
|
// case class BadgeRow(i: String, j: String, k: String, m: String)
|
||||||
|
// case class VotesRow(i: String, j: String, k: String, m: String, m: String)
|
||||||
|
// case class CommentsRow(i: String, j: String, k: String, m: String)
|
||||||
|
//case class BadgeRow(i: String, j: String, k: String, m: String)
|
||||||
|
//case class BadgeRow(i: String, j: String, k: String, m: String)
|
||||||
|
|
||||||
|
def ParseData() {
|
||||||
|
// The schema is encoded in a string
|
||||||
|
var schemaString = "Id UserId Name Date"
|
||||||
|
|
||||||
|
// Generate the schema based on the string of schema
|
||||||
|
var fields = schemaString.split(" ")
|
||||||
|
.map(fieldName => StructField(fieldName, StringType, nullable = true))
|
||||||
|
var schema = StructType(fields)
|
||||||
|
var rdd = ParseInput("../stackoverflow_dataset/badges.txt", schemaString)
|
||||||
|
val badgeData = sqlContext.createDataFrame(rdd, schema)
|
||||||
|
badgeData.show()
|
||||||
|
|
||||||
|
// The schema is encoded in a string
|
||||||
|
schemaString = "Id PostId VoteTypeId UserId CreationDate"
|
||||||
|
|
||||||
|
// Generate the schema based on the string of schema
|
||||||
|
fields = schemaString.split(" ")
|
||||||
|
.map(fieldName => StructField(fieldName, StringType, nullable = true))
|
||||||
|
schema = StructType(fields)
|
||||||
|
rdd = ParseInput("../stackoverflow_dataset/votes.txt", schemaString)
|
||||||
|
val voteData = sqlContext.createDataFrame(rdd, schema)
|
||||||
|
voteData.show()
|
||||||
|
// ParseInput("../stackoverflow_dataset/comments.txt")
|
||||||
|
// ParseInput("../stackoverflow_dataset/postHistory.txt")
|
||||||
|
// ParseInput("../stackoverflow_dataset/postLinks.txt")
|
||||||
|
}
|
||||||
|
|
||||||
|
private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = {
|
||||||
|
val inputFile = Main.sc.textFile(inputFilepath)
|
||||||
|
|
||||||
|
val Data = inputFile.map(line => ParsingFunc(line, schemaString))
|
||||||
|
return Data
|
||||||
|
}
|
||||||
|
|
||||||
|
private def ParsingFunc(line: String, schemaString: String) : Row = {
|
||||||
|
val xmlLine = scala.xml.XML.loadString(line)
|
||||||
|
var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName))
|
||||||
|
|
||||||
|
println(lineData)
|
||||||
|
return Row.fromSeq(lineData)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = {
|
||||||
|
try {
|
||||||
|
return xmlLine.attributes(attribute).text
|
||||||
|
} catch {
|
||||||
|
case npe: NullPointerException => return ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user