diff --git a/.gitignore b/.gitignore index 2934343..4dd33cb 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ bin/ external/ CMakeFiles/ + +mlib.scala diff --git a/.gittrack b/.gittrack index e69de29..afcdcd1 100644 --- a/.gittrack +++ b/.gittrack @@ -0,0 +1 @@ +*.scala diff --git a/Notes.mkdn b/Notes.mkdn new file mode 100644 index 0000000..54cfe00 --- /dev/null +++ b/Notes.mkdn @@ -0,0 +1,80 @@ +Building and Running the Project +================================ + +These instructions will work using the files at this commit: + + git checkout 800b1f59edaa20a9b65f32a815605307e1102baa + +First, you need to download the small sample of the stack overflow data that +can be found here: + +https://drive.google.com/open?id=0B0uip08Km2LPVTFTRFhrdHF2WW8 + +Put it in a directory at the project's root called ./stackoverflow_dataset + +Next, the following programs need to be installed on your system (homebrew was +used for easy installation on OSX) + +Spark: + + brew install apache-spark + +Scala: + + brew install scala + +Maven: + + brew install maven + +To build and run the project locally you need to set versions in the pom.xml +file to match those of the programs installed on your system. +the following lines need to be updated in the pom.xml file: + + + + org.scala-lang + scala-library + 2.11.8 <<<< + + + org.apache.spark + spark-core_2.11 <<<< + 2.0.2 <<<< + + + +running: + + spark-shell + +should give you output that will tell you your versions similar to this: + + Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version 2.0.2 + /_/ + + Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_92) + Type in expressions to have them evaluated. + Type :help for more information. + +Having edited this pom.xml file, run the following from the root of the +project to compile: + + mvn clean package + +This should run successfully (and will probably download and install a whole bunch of +stuff the first time you run it...) + +To run the compiled application: + + cd target + spark-submit --class ClusterSOData.Main --master local KMeans-0.0.1.jar + +That should run without errors, producing an output folder. Check that +something has been generated by running: + + cat output/part-00000 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..913ec6d --- /dev/null +++ b/pom.xml @@ -0,0 +1,97 @@ + + + + 4.0.0 + uk.ac.qmul.spark + KMeans + 0.0.1 + jar + "Spark KMeans Clustering" + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + maven-hadoop + Hadoop Releases + https://repository.cloudera.com/content/repositories/releases/ + + + cloudera-repos + Cloudera Repos + https://repository.cloudera.com/artifactory/cloudera-repos/ + + + + + + scala-tools.org + Scala-tools Maven2 Repository + http://scala-tools.org/repo-releases + + + + + UTF-8 + UTF-8 + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + + + + + + maven-compiler-plugin + 3.1 + + 1.6 + 1.6 + + + + + + + + org.scala-lang + scala-library + 2.10.5 + + + org.apache.spark + spark-core_2.10 + 1.6.0 + + + org.apache.spark + spark-sql_2.10 + 1.6.0 + + + diff --git a/src/main/scala/KMeans.scala b/src/main/scala/KMeans.scala new file mode 100644 index 0000000..a603e73 --- /dev/null +++ b/src/main/scala/KMeans.scala @@ -0,0 +1,19 @@ +package ClusterSOData + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.sql._ + +object KMeans { + /** + * Run KMeans clustering on an input RDD vector + */ + def train( + //data: DataSet + ) + { + //Normalise data using Euclidean normalisation + + } +} diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala new file mode 100644 index 0000000..8915c3e --- /dev/null +++ b/src/main/scala/Main.scala @@ -0,0 +1,31 @@ +package ClusterSOData +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SQLContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/* + * Run KMeans clustering on the StackOverflow dataset + */ +object Main { + // Initialize spark and SQL to allow for processing of structured data in a + // spark cluster + val sc = new SparkContext(new SparkConf().setAppName("Spark KMeans Clustering")) + val sqlContext= new org.apache.spark.sql.SQLContext(sc) + import sqlContext.implicits._ + + // Main function for task execution + def main(args: Array[String]) { + // Retrieve data from StackOverflow dataset XMLs. Format into DataFrames + // for easy access to data elements. + val df = DataParser.ParseData() + + // get the users XML file + val users = df("users") + users.show() + } +} diff --git a/src/main/scala/XMLParser.scala b/src/main/scala/XMLParser.scala new file mode 100644 index 0000000..11c23ab --- /dev/null +++ b/src/main/scala/XMLParser.scala @@ -0,0 +1,107 @@ +package ClusterSOData + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark._ +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SQLContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +/* + * Format and parse XML data to datasets, ready for further processing using + * spark + */ +object DataParser { + + /* + * Generate array of DataFrames from XML content + */ + def ParseData() : Map[String, DataFrame] = { + + // Define XML file locations and a string of attribute tags to retrieve + // from each xml element. + val xmlInfos = Array( + ("badges", "../stackoverflow_dataset/badges.txt", "Id UserId Name Date"), + ("comments", "../stackoverflow_dataset/comments.txt", "Id PostId Score Text CreationDate UserId"), + ("posts", "../stackoverflow_dataset/posts.txt", "Id PostTypeId ParentID AcceptedAnswerId CreationDate Score ViewCount Body OwnerUserId LastEditorUserId LastEditorDisplayName LastEditDate LastActivityDate CommunityOwnedDate ClosedDate Title Tags AnswerCount CommentCount FavoriteCount"), + ("postHistory", "../stackoverflow_dataset/postHistory.txt","Id PostHistoryTypeId PostId RevisionGUID CreationDate UserId UserDisplayName Comment Text CloseReasonId"), + ("postLinks", "../stackoverflow_dataset/postLinks.txt", "Id CreationDate PostId RelatedPostId PostLinkTypeId"), + ("users", "../stackoverflow_dataset/users.txt", "Reputation CreationDate DisplayName EmailHash LastAccessDate WebsiteUrl Location Age AboutMe Views UpVotes DownVotes"), + ("votes", "../stackoverflow_dataset/votes.txt", "Id PostId VoteTypeId UserId CreationDate") + ) + + // Store each file's DataFrame in an array of DataFrames. + val parsedData = xmlInfos.map(x => (x._1, ParseXMLInfo((x._2, x._3)))).toMap + + return parsedData + } + + private def ParseXMLInfo(xmlInfo: (String, String)) : DataFrame = { + // Get the XML attributes used for generating the table columns + var schemaString = xmlInfo._2 + // Generate schema using XML attribute string + var schema = GenerateSchemaFromString(schemaString) + // Generate RDD of data from the XML file + var rdd = ParseInput(xmlInfo._1, schemaString) + // Convert RDD to DataFrame for easier processing + var data = Main.sqlContext.createDataFrame(rdd, schema) + + return data + + } + + /* + * Generate a schema based on the string of XML attributes + */ + private def GenerateSchemaFromString(schemaString: String) : StructType = { + val fields = schemaString.split(" ") + .map(fieldName => StructField(fieldName, StringType, nullable = true)) + val schema = StructType(fields) + return schema + } + + /* + * Create RDD from XML file + * + * inputFilepath: Filepath to XML file + * schemaString: Space seperated attribute values + */ + private def ParseInput(inputFilepath: String, schemaString: String) : RDD[Row] = { + // Create spark text file object + val inputFile = Main.sc.textFile(inputFilepath) + + // Map the input file data to an RDD + val Data = inputFile.map(line => ParsingFunc(line, schemaString)) + return Data + } + + /* + * Retrieve XML attributes from a String + * + * line: XML file line + * schemaString: Space seperated attribute values + */ + private def ParsingFunc(line: String, schemaString: String) : Row = { + // Parse line of XML using Scala's built in XML library + val xmlLine = scala.xml.XML.loadString(line) + // Create array of values with element for each attribute in schemaString + var lineData = schemaString.split(" ").map(fieldName => getXMLAttribute(xmlLine, fieldName)) + + return Row.fromSeq(lineData) + } + + /* + * Handle NullPointerError raised when an attribute doesn't exist + * + * Return an empty string if the attribute doesn't exist + */ + private def getXMLAttribute(xmlLine: scala.xml.Elem, attribute: String) : String = { + try { + return xmlLine.attributes(attribute).text + } catch { + case npe: NullPointerException => return "" + } + } +}