11 Commits

Author SHA1 Message Date
Paul Campbell 3d648b26f7 first draft of report 2016-12-17 20:12:21 +00:00
Paul Campbell cbb470dd73 Improved XML Parser 2016-12-16 13:58:09 +00:00
Paul Campbell d6262b84ca Merge branch 'sam-dev' into paul-dev 2016-12-16 13:44:05 +00:00
Paul Campbell 81b6e47e26 FUCK 2016-12-16 13:42:24 +00:00
Paul Campbell abc8437620 Improved XML Parser 2016-12-16 13:36:02 +00:00
Sam Perry f63e2cd4b2 Finished XML parsing sanetization 2016-12-15 17:49:47 +00:00
Sam Perry c9c718dbe8 Fixed xml tag bug 2016-12-15 17:31:13 +00:00
Sam Perry f4e555ab9a Commented XML Parser 2016-12-15 15:00:54 +00:00
Sam Perry 8ef2828723 Finished XML Parser data casting 2016-12-15 14:13:33 +00:00
Sam Perry 32e774819e Almost finished casting XML for DataFrames 2016-12-15 13:07:18 +00:00
Sam Perry b41d085184 Pre-master merge commit 2016-12-14 14:41:37 +00:00
6 changed files with 97 additions and 33 deletions
Binary file not shown.
+1 -1
View File
@@ -72,7 +72,7 @@ stuff the first time you run it...)
To run the compiled application:
cd target
spark-submit --class KMeans --master local KMeans-0.0.1.jar
spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar
That should run without errors, producing an output folder. Check that
something has been generated by running:
Executable
+4
View File
@@ -0,0 +1,4 @@
#!/usr/bin/env bash
cd target
spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar
+5 -4
View File
@@ -3,16 +3,17 @@ package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql._
object KMeans {
/**
* Run KMeans clustering on an input RDD vector
*/
def run(
//data: RDD[Vector]
def train(
//data: DataSet
)
{
// val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
// counts.saveAsTextFile("output")
//Normalise data using Euclidean normalisation
}
}
+15 -2
View File
@@ -22,8 +22,21 @@ object Main {
def main(args: Array[String]) {
// Retrieve data from StackOverflow dataset XMLs. Format into DataFrames
// for easy access to data elements.
val df = DataParser.ParseData()
val dataFrames = DataParser.ParseData()
KMeans.run()
// get the users XML file
val users = dataFrames("users")
users.persist()
// Show 20 entries from the user dataset
users.show()
// Show types for the user dataset
users.printSchema()
users.show()
// create new dataframe with only the reputation of the users
users.select("CreationDate").show()
// Info on using DataFrames here: https://www.mapr.com/blog/using-apache-spark-dataframes-processing-tabular-data
}
}
+72 -26
View File
@@ -9,6 +9,18 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.xml.Elem
import scala.xml.factory.XMLLoader
import javax.xml.parsers.SAXParser
object MyXML extends XMLLoader[Elem] {
override def parser: SAXParser = {
val f = javax.xml.parsers.SAXParserFactory.newInstance()
f.setNamespaceAware(false)
f.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
f.newSAXParser()
}
}
/*
* Format and parse XML data to datasets, ready for further processing using
* spark
@@ -18,38 +30,38 @@ object DataParser {
/*
* Generate array of DataFrames from XML content
*/
def ParseData() : Array[DataFrame] = {
def ParseData() : Map[String, DataFrame] = {
// Define XML file locations and a string of attribute tags to retrieve
// from each xml element.
val xmlInfos = Array(
("../stackoverflow_dataset/badges.txt", "Id UserId Name Date"),
("../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"),
("../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"),
("../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"),
("../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"),
("../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"),
("../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate")
)
/*
("badges", "/data/stackoverflow/Badges", "Id UserId Name Date", Array[DataType](IntegerType, IntegerType, StringType, DateType)),
("comments", "/data/stackoverflow/Comments", "Id PostId Score Text CreationDate UserId", Array[DataType](IntegerType, IntegerType, IntegerType, StringType, DateType, IntegerType)),
("posts", "data/stackoverflow/Posts", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount", Array[DataType](IntegerType, IntegerType, IntegerType, IntegerType, DateType, IntegerType, IntegerType, StringType, IntegerType, IntegerType, StringType, DateType, DateType, DateType, DateType, StringType, StringType, IntegerType, IntegerType, IntegerType)),
("postHistory", "/data/stackoverflow/PostHistory","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId", Array[DataType](IntegerType, IntegerType, IntegerType,IntegerType, DateType, IntegerType, StringType, StringType, StringType, IntegerType)),
("postLinks", "data/stackoverflow/PostLinks", "Id CreationDate PostId RelatedPostId PostLinkTypeId", Array[DataType](IntegerType, DateType, IntegerType, IntegerType, IntegerType)),
*/
("users", "/Users/Work/o/Big_Data_Assignment_2/stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
/*
("votes", "/data/stackoverflow/Votes", "Id PostId VoteTypeId UserId CreationDate", Array[DataType](IntegerType, IntegerType, IntegerType, IntegerType, DateType))
*/
)
// Store each file's DataFrame in an array of DataFrames.
val parsedData = xmlInfos.map(x => ParseXMLInfo(x))
// Display a subset of each DataFrame's data in a table
for(i <- 0 until parsedData.length){
parsedData(i).show()
}
val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3, x._4)))).toMap
return parsedData
}
private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = {
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : DataFrame = {
// Get the XML attributes used for generating the table columns
var schemaString = xmlInfo._2
val schemaType = xmlInfo._3
// Generate schema using XML attribute string
var schema = GenerateSchemaFromString(schemaString)
var schema = GenerateSchemaFromString(schemaString, schemaType)
// Generate RDD of data from the XML file
var rdd = ParseInput(xmlInfo._1, schemaString)
var rdd = ParseInput(xmlInfo._1, schemaString, schemaType)
// Convert RDD to DataFrame for easier processing
var data = Main.sqlContext.createDataFrame(rdd, schema)
@@ -60,9 +72,13 @@ object DataParser {
/*
* Generate a schema based on the string of XML attributes
*/
private def GenerateSchemaFromString(schemaString: String) : StructType = {
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
private def GenerateSchemaFromString(schemaString: String, schemaType: Array[DataType]) : StructType = {
// Replace all DateTypes with Longs as date will now be stored as longs.
val sT = schemaType.map(i => if (i==DateType) LongType else i)
val schemaPairs = schemaString.split(" ") zip sT
// Create schema for columns and set their datatypes for DataFrame based on attribute names.
val fields = schemaPairs.map{case (fieldName: String, dataType: DataType) => StructField(fieldName, dataType, nullable = true)}
val schema = StructType(fields)
return schema
}
@@ -73,12 +89,14 @@ object DataParser {
* inputFilepath: Filepath to XML file
* schemaString: Space seperated attribute values
*/
private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = {
private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[Row] = {
// Create spark text file object
val inputFile = Main.sc.textFile(inputFilepath)
// Map the input file data to an RDD
val Data = inputFile.map(line => ParsingFunc(line, schemaString))
val Data = inputFile.collect {
case line if !SantizeLine(line) => ParsingFunc(line, schemaString, schemaType)
}
return Data
}
@@ -88,15 +106,43 @@ object DataParser {
* line: XML file line
* schemaString: Space seperated attribute values
*/
private def ParsingFunc(line: String, schemaString: String) : Row = {
private def SantizeLine(line: String) : Boolean = {
val invalidLines = Array("<?xml version=\"1.0\" encoding=\"utf-8\"?>", "<users>", "</users>")
return invalidLines contains line
}
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = {
// Parse line of XML using Scala's built in XML library
val xmlLine = scala.xml.XML.loadString(line)
val xmlLine = MyXML.loadString(line)
var schemaPairs = schemaString.split(" ") zip schemaType
// Create array of values with element for each attribute in schemaString
var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName))
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
return Row.fromSeq(lineData)
}
/*
* Cast attribute data to relevant datatype.
*/
private def castToDType(attribute: String, dType: DataType) : Any = {
dType match {
case StringType => return attribute
case IntegerType =>
try {
return attribute.toInt
} catch {
// If the string was not castable to integer then it is not a number.
// In this case, return a placeholder value of -1.
case e: Exception => return -1
}
case DateType =>
// If the string is a date, convert from date string to long.
var format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
return format.parse(attribute).getTime()
}
}
/*
* Handle NullPointerError raised when an attribute doesn't exist
*