Clustering now working
This commit is contained in:
@@ -81,17 +81,17 @@
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>2.10.5</version>
|
||||
<version>2.11.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
<version>2.0.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.10</artifactId>
|
||||
<version>1.6.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-sql_2.10</artifactId>
|
||||
<version>1.6.0</version>
|
||||
<artifactId>spark-sql_2.11</artifactId>
|
||||
<version>2.0.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -9,10 +9,6 @@ import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import scala.xml.Elem
|
||||
import scala.xml.factory.XMLLoader
|
||||
import scala.xml._
|
||||
|
||||
/*
|
||||
* Format and parse XML data to datasets, ready for further processing using
|
||||
* spark
|
||||
@@ -22,7 +18,7 @@ object XMLParser {
|
||||
/*
|
||||
* Generate array of DataFrames from XML content
|
||||
*/
|
||||
def ParseData() : Map[String, RDD[Row]] = {
|
||||
def ParseData() : Map[String, DataFrame] = {
|
||||
|
||||
// Define XML file locations and a string of attribute tags to retrieve
|
||||
// from each xml element.
|
||||
@@ -34,7 +30,7 @@ object XMLParser {
|
||||
("postHistory", "/data/stackoverflow/PostHistory","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId", Array[DataType](IntegerType, IntegerType, IntegerType,IntegerType, DateType, IntegerType, StringType, StringType, StringType, IntegerType)),
|
||||
("postLinks", "data/stackoverflow/PostLinks", "Id CreationDate PostId RelatedPostId PostLinkTypeId", Array[DataType](IntegerType, DateType, IntegerType, IntegerType, IntegerType)),
|
||||
*/
|
||||
("users", "stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
|
||||
("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes", Array[DataType](IntegerType, DateType, StringType, StringType, DateType, StringType, StringType, IntegerType, StringType, IntegerType, IntegerType, IntegerType))
|
||||
/*
|
||||
("votes", "/data/stackoverflow/Votes", "Id PostId VoteTypeId UserId CreationDate", Array[DataType](IntegerType, IntegerType, IntegerType, IntegerType, DateType))
|
||||
*/
|
||||
@@ -46,7 +42,7 @@ object XMLParser {
|
||||
return parsedData
|
||||
}
|
||||
|
||||
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : RDD[Row] = {
|
||||
private def ParseXMLInfo(xmlInfo: (String, String, Array[DataType])) : DataFrame = {
|
||||
// Get the XML attributes used for generating the table columns
|
||||
var schemaString = xmlInfo._2
|
||||
val schemaType = xmlInfo._3
|
||||
@@ -55,9 +51,9 @@ object XMLParser {
|
||||
// Generate RDD of data from the XML file
|
||||
var rdd = ParseInput(xmlInfo._1, schemaString, schemaType)
|
||||
// Convert RDD to DataFrame for easier processing
|
||||
//var data = Main.sqlContext.createDataFrame(rdd, schema)
|
||||
var data = Main.sqlContext.createDataFrame(rdd, schema)
|
||||
|
||||
return rdd
|
||||
return data
|
||||
|
||||
}
|
||||
|
||||
@@ -106,7 +102,7 @@ object XMLParser {
|
||||
|
||||
private def ParsingFunc(line: String, schemaString: String, schemaType: Array[DataType]) : Row = {
|
||||
// Parse line of XML using Scala's built in XML library
|
||||
val xmlLine = XML.loadString(line)
|
||||
val xmlLine = scala.xml.XML.loadString(line)
|
||||
var schemaPairs = schemaString.split(" ") zip schemaType
|
||||
// Create array of values with element for each attribute in schemaString
|
||||
var lineData = schemaPairs.map { case (fieldName: String, dType: DataType) => castToDType(getXMLAttribute(xmlLine, fieldName), dType) }
|
||||
@@ -128,10 +124,12 @@ object XMLParser {
|
||||
// In this case, return a placeholder value of -1.
|
||||
case e: Exception => return -1
|
||||
}
|
||||
case DateType => return attribute
|
||||
/*// If the string is a date, convert from date string to long.
|
||||
case DateType =>
|
||||
// If the string is a date, convert from date string to long.
|
||||
var format = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
|
||||
var longTime = format.parse(attribute).getTime()
|
||||
return longTime
|
||||
/*
|
||||
// Then convert long to int representing days since epoch
|
||||
var longDays : Long = longTime / (1000*60*60*24)
|
||||
return longDays.toInt*/
|
||||
|
||||
Reference in New Issue
Block a user