7 Commits

Author SHA1 Message Date
Joe Darby a832da7a77 working feature parsing 2016-12-14 00:12:49 +00:00
Joe Darby 36ee19d7d9 generalise features 2016-12-13 21:31:07 +00:00
Joe Darby 45b97e6155 modify gitignore 2016-12-13 21:13:30 +00:00
Joe Darby c2ceb855d7 get ages 2016-12-13 19:58:08 +00:00
Joe Darby 05c219aa6d testing 2016-12-13 19:42:26 +00:00
Joe Darby f4e7840033 first attempt at age parse 2016-12-13 18:45:35 +00:00
Joe Darby 4d2d031583 modify gitignore 2016-12-13 17:55:17 +00:00
15 changed files with 218 additions and 8 deletions
+2
View File
@@ -37,3 +37,5 @@ external/
CMakeFiles/
mlib.scala
/target
+3 -8
View File
@@ -81,17 +81,12 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
<artifactId>spark-core_2.10</artifactId>
<version>1.0.0-cdh5.1.0</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,48 @@
package bdp.spark.KMeans
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkKMeans {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans"))
val lines = sc.textFile(args(0))
val featureSet = lines.map(getFeatures)
val printableFeatureSet = featureSet.map(makeListPrintable)
printableFeatureSet.foreach{println}
}
def getFeatures(line :String) : List[String] = {
val fragments = line.split("\"")
val featureIDs = List(" Reputation="," CreationDate="," LastAccessDate="," Views="," UpVotes="," DownVotes="," Age=")
var features = new ListBuffer[String]()
features += fragments(1)
var a = ""
for (a <- featureIDs) {
if (fragments.contains(a)) {
val index = fragments.indexOf(a)
features += fragments(index + 1)
} else {
features += ""
}
}
features(2) = features(2).substring(0,10)
features(3) = features(3).substring(0,10)
val featuresList = features.toList
return featuresList
}
def makeListPrintable(featureList : List[String]) : String = {
return featureList.mkString(", ")
}
}
Binary file not shown.
+1
View File
@@ -0,0 +1 @@
.
Binary file not shown.
Binary file not shown.
+5
View File
@@ -0,0 +1,5 @@
#Generated by Maven
#Wed Dec 14 00:04:22 GMT 2016
version=0.0.1
groupId=uk.ac.qmul.spark
artifactId=KMeans
+18
View File
@@ -0,0 +1,18 @@
package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object KMeans {
/**
* Run KMeans clustering on an input RDD vector
*/
def run(
//data: RDD[Vector]
)
{
// val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
// counts.saveAsTextFile("output")
}
}
+29
View File
@@ -0,0 +1,29 @@
package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/*
* Run KMeans clustering on the StackOverflow dataset
*/
object Main {
// Initialize spark and SQL to allow for processing of structured data in a
// spark cluster
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering"))
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
// Main function for task execution
def main(args: Array[String]) {
// Retrieve data from StackOverflow dataset XMLs. Format into DataFrames
// for easy access to data elements.
val df = DataParser.ParseData()
KMeans.run()
}
}
+112
View File
@@ -0,0 +1,112 @@
package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/*
* Format and parse XML data to datasets, ready for further processing using
* spark
*/
object DataParser {
/*
* Generate array of DataFrames from XML content
*/
def ParseData() : Array[DataFrame] = {
// Define XML file locations and a string of attribute tags to retrieve
// from each xml element.
val xmlInfos = Array(
("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"),
("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"),
("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"),
("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"),
("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"),
("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"),
("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate")
)
// Store each file's DataFrame in an array of DataFrames.
val parsedData = xmlInfos.map(x => ParseXMLInfo(x))
// Display a subset of each DataFrame's data in a table
for(i <- 0 until parsedData.length){
parsedData(i).show()
}
return parsedData
}
private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = {
// Get the XML attributes used for generating the table columns
var schemaString = xmlInfo._2
// Generate schema using XML attribute string
var schema = GenerateSchemaFromString(schemaString)
// Generate RDD of data from the XML file
var rdd = ParseInput(xmlInfo._1, schemaString)
// Convert RDD to DataFrame for easier processing
var data = Main.sqlContext.createDataFrame(rdd, schema)
return data
}
/*
* Generate a schema based on the string of XML attributes
*/
private def GenerateSchemaFromString(schemaString: String) : StructType = {
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
return schema
}
/*
* Create RDD from XML file
*
* inputFilepath: Filepath to XML file
* schemaString: Space seperated attribute values
*/
private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = {
// Create spark text file object
val inputFile = Main.sc.textFile(inputFilepath)
// Map the input file data to an RDD
val Data = inputFile.map(line => ParsingFunc(line, schemaString))
return Data
}
/*
* Retrieve XML attributes from a String
*
* line: XML file line
* schemaString: Space seperated attribute values
*/
private def ParsingFunc(line: String, schemaString: String) : Row = {
// Parse line of XML using Scala's built in XML library
val xmlLine = scala.xml.XML.loadString(line)
// Create array of values with element for each attribute in schemaString
var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName))
return Row.fromSeq(lineData)
}
/*
* Handle NullPointerError raised when an attribute doesn't exist
*
* Return an empty string if the attribute doesn't exist
*/
private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = {
try {
return xmlLine.attributes(attribute).text
} catch {
case npe: NullPointerException => return ""
}
}
}