Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a832da7a77 | |||
| 36ee19d7d9 | |||
| 45b97e6155 | |||
| c2ceb855d7 | |||
| 05c219aa6d | |||
| f4e7840033 | |||
| 4d2d031583 |
@@ -37,3 +37,5 @@ external/
|
||||
CMakeFiles/
|
||||
|
||||
mlib.scala
|
||||
|
||||
/target
|
||||
|
||||
@@ -81,17 +81,12 @@
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>2.11.8</version>
|
||||
<version>2.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
<version>2.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
<version>2.0.2</version>
|
||||
<artifactId>spark-core_2.10</artifactId>
|
||||
<version>1.0.0-cdh5.1.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
package bdp.spark.KMeans
|
||||
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark.SparkConf
|
||||
|
||||
object SparkKMeans {
|
||||
def main(args: Array[String]) {
|
||||
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans"))
|
||||
val lines = sc.textFile(args(0))
|
||||
val featureSet = lines.map(getFeatures)
|
||||
val printableFeatureSet = featureSet.map(makeListPrintable)
|
||||
printableFeatureSet.foreach{println}
|
||||
|
||||
}
|
||||
|
||||
def getFeatures(line :String) : List[String] = {
|
||||
|
||||
val fragments = line.split("\"")
|
||||
|
||||
val featureIDs = List(" Reputation="," CreationDate="," LastAccessDate="," Views="," UpVotes="," DownVotes="," Age=")
|
||||
|
||||
var features = new ListBuffer[String]()
|
||||
features += fragments(1)
|
||||
|
||||
var a = ""
|
||||
for (a <- featureIDs) {
|
||||
if (fragments.contains(a)) {
|
||||
val index = fragments.indexOf(a)
|
||||
features += fragments(index + 1)
|
||||
} else {
|
||||
features += ""
|
||||
}
|
||||
}
|
||||
|
||||
features(2) = features(2).substring(0,10)
|
||||
features(3) = features(3).substring(0,10)
|
||||
|
||||
val featuresList = features.toList
|
||||
return featuresList
|
||||
}
|
||||
|
||||
def makeListPrintable(featureList : List[String]) : String = {
|
||||
return featureList.mkString(", ")
|
||||
}
|
||||
}
|
||||
Binary file not shown.
@@ -0,0 +1 @@
|
||||
.
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,5 @@
|
||||
#Generated by Maven
|
||||
#Wed Dec 14 00:04:22 GMT 2016
|
||||
version=0.0.1
|
||||
groupId=uk.ac.qmul.spark
|
||||
artifactId=KMeans
|
||||
@@ -0,0 +1,18 @@
|
||||
package ClusterSOData
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark._
|
||||
|
||||
object KMeans {
|
||||
/**
|
||||
* Run KMeans clustering on an input RDD vector
|
||||
*/
|
||||
def run(
|
||||
//data: RDD[Vector]
|
||||
)
|
||||
{
|
||||
// val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
|
||||
// counts.saveAsTextFile("output")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package ClusterSOData
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.SQLContext._
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/*
|
||||
* Run KMeans clustering on the StackOverflow dataset
|
||||
*/
|
||||
object Main {
|
||||
// Initialize spark and SQL to allow for processing of structured data in a
|
||||
// spark cluster
|
||||
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering"))
|
||||
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
|
||||
import sqlContext.implicits._
|
||||
|
||||
// Main function for task execution
|
||||
def main(args: Array[String]) {
|
||||
// Retrieve data from StackOverflow dataset XMLs. Format into DataFrames
|
||||
// for easy access to data elements.
|
||||
val df = DataParser.ParseData()
|
||||
|
||||
KMeans.run()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package ClusterSOData
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.SparkContext._
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.SQLContext._
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/*
|
||||
* Format and parse XML data to datasets, ready for further processing using
|
||||
* spark
|
||||
*/
|
||||
object DataParser {
|
||||
|
||||
/*
|
||||
* Generate array of DataFrames from XML content
|
||||
*/
|
||||
def ParseData() : Array[DataFrame] = {
|
||||
|
||||
// Define XML file locations and a string of attribute tags to retrieve
|
||||
// from each xml element.
|
||||
val xmlInfos = Array(
|
||||
("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"),
|
||||
("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"),
|
||||
("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"),
|
||||
("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"),
|
||||
("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"),
|
||||
("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"),
|
||||
("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate")
|
||||
)
|
||||
|
||||
// Store each file's DataFrame in an array of DataFrames.
|
||||
val parsedData = xmlInfos.map(x => ParseXMLInfo(x))
|
||||
|
||||
// Display a subset of each DataFrame's data in a table
|
||||
for(i <- 0 until parsedData.length){
|
||||
parsedData(i).show()
|
||||
}
|
||||
|
||||
return parsedData
|
||||
}
|
||||
|
||||
private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = {
|
||||
// Get the XML attributes used for generating the table columns
|
||||
var schemaString = xmlInfo._2
|
||||
// Generate schema using XML attribute string
|
||||
var schema = GenerateSchemaFromString(schemaString)
|
||||
// Generate RDD of data from the XML file
|
||||
var rdd = ParseInput(xmlInfo._1, schemaString)
|
||||
// Convert RDD to DataFrame for easier processing
|
||||
var data = Main.sqlContext.createDataFrame(rdd, schema)
|
||||
|
||||
return data
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* Generate a schema based on the string of XML attributes
|
||||
*/
|
||||
private def GenerateSchemaFromString(schemaString: String) : StructType = {
|
||||
val fields = schemaString.split(" ")
|
||||
.map(fieldName => StructField(fieldName, StringType, nullable = true))
|
||||
val schema = StructType(fields)
|
||||
return schema
|
||||
}
|
||||
|
||||
/*
|
||||
* Create RDD from XML file
|
||||
*
|
||||
* inputFilepath: Filepath to XML file
|
||||
* schemaString: Space seperated attribute values
|
||||
*/
|
||||
private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = {
|
||||
// Create spark text file object
|
||||
val inputFile = Main.sc.textFile(inputFilepath)
|
||||
|
||||
// Map the input file data to an RDD
|
||||
val Data = inputFile.map(line => ParsingFunc(line, schemaString))
|
||||
return Data
|
||||
}
|
||||
|
||||
/*
|
||||
* Retrieve XML attributes from a String
|
||||
*
|
||||
* line: XML file line
|
||||
* schemaString: Space seperated attribute values
|
||||
*/
|
||||
private def ParsingFunc(line: String, schemaString: String) : Row = {
|
||||
// Parse line of XML using Scala's built in XML library
|
||||
val xmlLine = scala.xml.XML.loadString(line)
|
||||
// Create array of values with element for each attribute in schemaString
|
||||
var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName))
|
||||
|
||||
return Row.fromSeq(lineData)
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle NullPointerError raised when an attribute doesn't exist
|
||||
*
|
||||
* Return an empty string if the attribute doesn't exist
|
||||
*/
|
||||
private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = {
|
||||
try {
|
||||
return xmlLine.attributes(attribute).text
|
||||
} catch {
|
||||
case npe: NullPointerException => return ""
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user