Merge branch 'sam-dev'

This commit is contained in:
Sam Perry
2016-12-14 14:42:46 +00:00
7 changed files with 337 additions and 0 deletions
+2
View File
@@ -35,3 +35,5 @@ bin/
external/
CMakeFiles/
mlib.scala
+1
View File
@@ -0,0 +1 @@
*.scala
+80
View File
@@ -0,0 +1,80 @@
Building and Running the Project
================================
These instructions will work using the files at this commit:
git checkout 800b1f59edaa20a9b65f32a815605307e1102baa
First, you need to download the small sample of the stack overflow data that
can be found here:
https://drive.google.com/open?id=0B0uip08Km2LPVTFTRFhrdHF2WW8
Put it in a directory at the project's root called ./stackoverflow_dataset
Next, the following programs need to be installed on your system (homebrew was
used for easy installation on OSX)
Spark:
brew install apache-spark
Scala:
brew install scala
Maven:
brew install maven
To build and run the project locally you need to set versions in the pom.xml
file to match those of the programs installed on your system.
the following lines need to be updated in the pom.xml file:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version> <<<<
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId> <<<<
<version>2.0.2</version> <<<<
</dependency>
</dependencies>
running:
spark-shell
should give you output that will tell you your versions similar to this:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.2
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92)
Type in expressions to have them evaluated.
Type :help for more information.
Having edited this pom.xml file, run the following from the root of the
project to compile:
mvn clean package
This should run successfully (and will probably download and install a whole bunch of
stuff the first time you run it...)
To run the compiled application:
cd target
spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar
That should run without errors, producing an output folder. Check that
something has been generated by running:
cat output/part-00000
+97
View File
@@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
Cloudera, Inc. licenses this file to you under the Apache License,
Version 2.0 (the "License"). You may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the
License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>uk.ac.qmul.spark</groupId>
<artifactId>KMeans</artifactId>
<version>0.0.1</version>
<packaging>jar</packaging>
<name>"Spark KMeans Clustering"</name>
<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>maven-hadoop</id>
<name>Hadoop Releases</name>
<url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
<repository>
<id>cloudera-repos</id>
<name>Cloudera Repos</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
+19
View File
@@ -0,0 +1,19 @@
package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql._
object KMeans {
/**
* Run KMeans clustering on an input RDD vector
*/
def train(
//data: DataSet
)
{
//Normalise data using Euclidean normalisation
}
}
+31
View File
@@ -0,0 +1,31 @@
package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/*
* Run KMeans clustering on the StackOverflow dataset
*/
object Main {
// Initialize spark and SQL to allow for processing of structured data in a
// spark cluster
val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering"))
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
// Main function for task execution
def main(args: Array[String]) {
// Retrieve data from StackOverflow dataset XMLs. Format into DataFrames
// for easy access to data elements.
val df = DataParser.ParseData()
// get the users XML file
val users = df("users")
users.show()
}
}
+107
View File
@@ -0,0 +1,107 @@
package ClusterSOData
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._
/*
* Format and parse XML data to datasets, ready for further processing using
* spark
*/
object DataParser {
/*
* Generate array of DataFrames from XML content
*/
def ParseData() : Map[String, DataFrame] = {
// Define XML file locations and a string of attribute tags to retrieve
// from each xml element.
val xmlInfos = Array(
("badges", "../stackoverflow_dataset/badges.txt", "Id UserId Name Date"),
("comments", "../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"),
("posts", "../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"),
("postHistory", "../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"),
("postLinks", "../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"),
("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"),
("votes", "../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate")
)
// Store each file's DataFrame in an array of DataFrames.
val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3)))).toMap
return parsedData
}
private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = {
// Get the XML attributes used for generating the table columns
var schemaString = xmlInfo._2
// Generate schema using XML attribute string
var schema = GenerateSchemaFromString(schemaString)
// Generate RDD of data from the XML file
var rdd = ParseInput(xmlInfo._1, schemaString)
// Convert RDD to DataFrame for easier processing
var data = Main.sqlContext.createDataFrame(rdd, schema)
return data
}
/*
* Generate a schema based on the string of XML attributes
*/
private def GenerateSchemaFromString(schemaString: String) : StructType = {
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
return schema
}
/*
* Create RDD from XML file
*
* inputFilepath: Filepath to XML file
* schemaString: Space seperated attribute values
*/
private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = {
// Create spark text file object
val inputFile = Main.sc.textFile(inputFilepath)
// Map the input file data to an RDD
val Data = inputFile.map(line => ParsingFunc(line, schemaString))
return Data
}
/*
* Retrieve XML attributes from a String
*
* line: XML file line
* schemaString: Space seperated attribute values
*/
private def ParsingFunc(line: String, schemaString: String) : Row = {
// Parse line of XML using Scala's built in XML library
val xmlLine = scala.xml.XML.loadString(line)
// Create array of values with element for each attribute in schemaString
var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName))
return Row.fromSeq(lineData)
}
/*
* Handle NullPointerError raised when an attribute doesn't exist
*
* Return an empty string if the attribute doesn't exist
*/
private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = {
try {
return xmlLine.attributes(attribute).text
} catch {
case npe: NullPointerException => return ""
}
}
}