9 Commits

Author SHA1 Message Date
Sam Perry 86b512234a Hack not working yet... 2016-12-19 11:21:16 +00:00
Sam Perry 100aab773a HAck 2016-12-16 16:05:26 +00:00
Sam Perry e04b71cbd5 Merge branch 'paul-dev' into sam-dev 2016-12-16 14:06:30 +00:00
Sam Perry 8253e01584 Merge branch 'sam-dev' of github.com:Pezz89/Big_Data_Assignment_2 into sam-dev 2016-12-16 14:04:42 +00:00
Sam Perry d3f067bf6a Converted DataFrames to RDDs 2016-12-16 14:03:24 +00:00
Paul Campbell cbb470dd73 Improved XML Parser 2016-12-16 13:58:09 +00:00
Paul Campbell d6262b84ca Merge branch 'sam-dev' into paul-dev 2016-12-16 13:44:05 +00:00
Paul Campbell 81b6e47e26 FUCK 2016-12-16 13:42:24 +00:00
Paul Campbell abc8437620 Improved XML Parser 2016-12-16 13:36:02 +00:00
4 changed files with 93 additions and 18 deletions
+5
View File
@@ -93,5 +93,10 @@
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.11</artifactId>
<version>0.4.0</version>
</dependency>
</dependencies>
</project>
+1 -1
View File
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
cd target
spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar
spark-submit --packages com.databricks:spark-xml_2.11:0.4.0 --class ClusterSOData.Main --master local KMeans-0.0.1.jar
+24 -5
View File
@@ -8,6 +8,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import com.databricks.spark.xml.XmlReader;
/*
* Run KMeans clustering on the StackOverflow dataset
*/
@@ -17,24 +18,42 @@ object Main {
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering"))
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val spark = SparkSession.builder.
master("local")
.appName("tester")
.getOrCreate()
import spark.implicits._
// Main function for task execution
def main(args: Array[String]) {
// Retrieve data from StackOverflow dataset XMLs. Format into DataFrames
// for easy access to data elements.
val dataFrames = DataParser.ParseData()
//val dataFrames = DataParser.ParseData()
val customSchema = StructType(Array(
StructField("_Reputation", StringType, nullable = true)))
val localxml="../stackoverflow_dataset/users.txt";
val booksFileTag = "row";
val df = sqlContext.read
.format("com.databricks.spark.xml")
.load(localxml)
.schema(customSchema)
df.printSchema();
// get the users XML file
val users = dataFrames("users")
users.persist()
//val users = dataFrames("users")
//users.persist()
/*
// Show 20 entries from the user dataset
users.show()
// Show types for the user dataset
users.printSchema()
users.show()
*/
// create new dataframe with only the reputation of the users
users.select("CreationDate").show()
//val a = users.select("Reputation").rdd.map(r => r(0)).persist()
// Info on using DataFrames here: https://www.mapr.com/blog/using-apache-spark-dataframes-processing-tabular-data
}
+63 -12
View File
@@ -1,5 +1,6 @@
package ClusterSOData
import scala.collection.mutable.ListBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
@@ -9,16 +10,36 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.xml.Elem
import scala.xml.factory.XMLLoader
import javax.xml.parsers.SAXParser
object MyXML extends XMLLoader[Elem] {
override def parser: SAXParser = {
val f = javax.xml.parsers.SAXParserFactory.newInstance()
f.setNamespaceAware(false)
f.setFeature("http://xml.org/sax/features/validation", false)
f.setFeature("http://apache.org/xml/features/nonvalidating/load-dtd-grammar", false)
f.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
f.setFeature("http://xml.org/sax/features/external-general-entities", false)
f.setFeature("http://xml.org/sax/features/external-parameter-entities", false)
f.setValidating(false)
f.newSAXParser()
}
}
/*
* Format and parse XML data to datasets, ready for further processing using
* spark
*/
object DataParser {
val xParse = scala.xml.XML
/*
* Generate array of DataFrames from XML content
*/
def ParseData() : Map[String, DataFrame] = {
def ParseData() : Map[String, RDD[List[String]]] = {
// Define XML file locations and a string of attribute tags to retrieve
// from each xml element.
@@ -30,7 +51,7 @@ object DataParser {
("postHistory", "/data/stackoverflow/PostHistory","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId", Array[DataType](IntegerType, IntegerType, IntegerType,IntegerType, DateType, IntegerType, StringType, StringType, StringType, IntegerType)),
("postLinks", "data/stackoverflow/PostLinks", "Id CreationDate PostId RelatedPostId PostLinkTypeId", Array[DataType](IntegerType, DateType, IntegerType, IntegerType, IntegerType)),
*/
("users", "/data/stackoverflow/Users", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
("users", "../stackoverflow_dataset/users2.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
/*
("votes", "/data/stackoverflow/Votes", "Id PostId VoteTypeId UserId CreationDate", Array[DataType](IntegerType, IntegerType, IntegerType, IntegerType, DateType))
*/
@@ -42,7 +63,7 @@ object DataParser {
return parsedData
}
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : DataFrame = {
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : RDD[List[String]] = {
// Get the XML attributes used for generating the table columns
var schemaString = xmlInfo._2
val schemaType = xmlInfo._3
@@ -51,9 +72,10 @@ object DataParser {
// Generate RDD of data from the XML file
var rdd = ParseInput(xmlInfo._1, schemaString, schemaType)
// Convert RDD to DataFrame for easier processing
var data = Main.sqlContext.createDataFrame(rdd, schema)
//var data = Main.sqlContext.createDataFrame(rdd, schema)
rdd.persist()
return data
return rdd
}
@@ -77,13 +99,13 @@ object DataParser {
* inputFilepath: Filepath to XML file
* schemaString: Space seperated attribute values
*/
private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[Row] = {
private def ParseInput(inputFilepath: String, schemaString: String, schemaType: Array[DataType]) : RDD[List[String]] = {
// Create spark text file object
val inputFile = Main.sc.textFile(inputFilepath)
// Map the input file data to an RDD
val Data = inputFile.collect {
case line if !SantizeLine(line) => ParsingFunc(line, schemaString, schemaType)
case line if true => ParsingFunc(line, schemaString, schemaType)
}
return Data
}
@@ -95,19 +117,48 @@ object DataParser {
* schemaString: Space seperated attribute values
*/
/*
private def SantizeLine(line: String) : Boolean = {
val invalidLines = Array("<?xml version=\"1.0\" encoding=\"utf-8\"?>", "<users>", "</users>")
return invalidLines contains line
}
*/
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = {
private def getFeatures(line :String) : List[String] = {
val fragments = line.split("\"")
val featureIDs = List(" Reputation="," CreationDate="," LastAccessDate="," Views="," UpVotes="," DownVotes="," Age=")
var features = new ListBuffer[String]()
features += fragments(1)
var a = ""
for (a <- featureIDs) {
if (fragments.contains(a)) {
val index = fragments.indexOf(a)
features += fragments(index + 1)
} else {
features += ""
}
}
features(2) = features(2).substring(0,10)
features(3) = features(3).substring(0,10)
val featuresList = features.toList
return featuresList
}
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : List[String] = {
// Parse line of XML using Scala's built in XML library
val xmlLine = scala.xml.XML.loadString(line)
var schemaPairs = schemaString.split(" ") zip schemaType
val blargh = xParse.loadString(line)
val xmlLine = getFeatures(line)
//var schemaPairs = schemaString.split(" ") zip schemaType
// Create array of values with element for each attribute in schemaString
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
//var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
return Row.fromSeq(lineData)
//return lineData
return xmlLine
}
/*