Initial commit

This commit is contained in:
Sam Perry
2016-11-01 09:50:02 +00:00
commit 889d5343be
9 changed files with 169769 additions and 0 deletions
+84729
View File
File diff suppressed because it is too large Load Diff
+49
View File
@@ -0,0 +1,49 @@
<project name="Stocks" default="dist" basedir=".">
<description>
Sample MapReduce project build file
</description>
<!-- set global properties for this build -->
<property name="src" location="src" />
<property name="build" location="classes" />
<property name="dist" location="dist" />
<!--<property name="hadoop.version" value="2.0.0-mr1-cdh4.1.2" /> -->
<property name="hadoop.base.path" value="/usr/lib/hadoop/client/" />
<path id="classpath">
<fileset dir="${hadoop.base.path}">
<include name="**/*.jar" />
</fileset>
</path>
<target name="init">
<!-- Create the time stamp -->
<tstamp />
<!-- Create the build directory structure used by compile -->
<mkdir dir="${build}" />
</target>
<target name="compile" depends="init" description="compile the source ">
<!-- Compile the java code from ${src} into ${build} -->
<property name="myclasspath" refid="classpath"/>
<echo message="Classpath = ${myclasspath}"/>
<javac srcdir="${src}" debug="true" destdir="${build}" target="1.7">
<classpath refid="classpath"/>
</javac>
</target>
<target name="dist" depends="compile" description="generate the distribution">
<!-- Create the distribution directory -->
<mkdir dir="${dist}" />
<!-- Put everything in ${build} into a jar file -->
<jar jarfile="${dist}/Stocks.jar" basedir="${build}" />
</target>
<target name="clean" description="clean up">
<!-- Delete the ${build} and ${dist} directory trees -->
<delete dir="${build}" />
<delete dir="${dist}" />
</target>
</project>
+1
View File
@@ -0,0 +1 @@
./hadoop-debug jar dist/Stocks.jar Stocks input out
Executable
+10
View File
@@ -0,0 +1,10 @@
#!/bin/bash
PORT_NUMBER=9009
export HADOOP_CLIENT_OPTS='-agentlib:jdwp=transport=dt_socket,address=localhost:$PORT_NUMBER,server=y,suspend=y'
echo Running: hadoop-local "$@" \(with jdb debugging enabled on port number: $PORT_NUMBER\)
echo Run \`jdb -sourcepath ./src/ -attach localhost:$PORT_NUMBER\` from the main project director to access the debugger.
hadoop-local "$@"
# Note:
# The the javac entry in the ant build.xml file needs to be updated to include the debug flag as shown below:
# <javac srcdir="${src}" debug="true" destdir="${build}" target="1.7">
+84729
View File
File diff suppressed because it is too large Load Diff
+33
View File
@@ -0,0 +1,33 @@
package bdp.stock;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;
public class CompanyMinMaxReducer extends Reducer<Text, DailyStock, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<DailyStock> values, Context context)
throws IOException, InterruptedException {
double maxVal = Double.NEGATIVE_INFINITY;
double minVal = Double.POSITIVE_INFINITY;
for (DailyStock value : values) {
if (value.getHigh().get() > maxVal)
maxVal = value.getHigh().get();
if (value.getLow().get() < minVal)
minVal = value.getLow().get();
}
String resultString = "MIN: " + minVal + " MAX: " + maxVal;
result.set(resultString);
context.write(key, result);
}
}
+17
View File
@@ -0,0 +1,17 @@
package bdp.stock;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// Extend mapper class to use custom mapping function.
public class DailyMaxMapper extends Mapper<Object, DailyStock, Text, DailyStock> {
// Map function for mapping key value pairs
public void map(Object key, DailyStock value, Context context) throws IOException, InterruptedException {
context.write(value.getCompany(), value);
}
}
+159
View File
@@ -0,0 +1,159 @@
package bdp.stock;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.*;
public class DailyStock implements WritableComparable<DailyStock> {
private Text company;
private LongWritable day;
private DoubleWritable opening;
private DoubleWritable close;
private DoubleWritable high;
private DoubleWritable low;
private IntWritable volume;
private DoubleWritable adjClose;
public DailyStock() {
set(new Text(), new LongWritable(), new DoubleWritable(),
new DoubleWritable(), new DoubleWritable(),
new DoubleWritable(), new IntWritable(), new DoubleWritable());
}
public DailyStock(String company, Calendar day, double opening,
double close, double high, double low, double adjClose, int volume) {
set(new Text(company), new LongWritable(day.getTimeInMillis()),
new DoubleWritable(opening), new DoubleWritable(close),
new DoubleWritable(high), new DoubleWritable(low),
new IntWritable(volume), new DoubleWritable(adjClose));
}
public DailyStock(Text company, LongWritable day, DoubleWritable opening,
DoubleWritable close, DoubleWritable high, DoubleWritable low,
IntWritable volume, DoubleWritable adjClose) {
set(company, day, opening, close, high, low, volume, adjClose);
}
public void set(Text company, LongWritable day, DoubleWritable opening,
DoubleWritable close, DoubleWritable high, DoubleWritable low,
IntWritable volume, DoubleWritable adjClose) {
this.company = company;
this.day = day;
this.opening = opening;
this.close = close;
this.high = high;
this.low = low;
this.volume = volume;
this.adjClose = adjClose;
}
public Text getCompany() {
return company;
}
public LongWritable getDay() {
return day;
}
public DoubleWritable getOpening() {
return opening;
}
public DoubleWritable getClose() {
return close;
}
public DoubleWritable getHigh() {
return high;
}
public DoubleWritable getLow() {
return low;
}
public IntWritable getVolume() {
return volume;
}
public DoubleWritable getAdjClose() {
return adjClose;
}
@Override
public void write(DataOutput out) throws IOException {
company.write(out);
day.write(out);
opening.write(out);
close.write(out);
high.write(out);
low.write(out);
volume.write(out);
adjClose.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
company.readFields(in);
day.readFields(in);
opening.readFields(in);
close.readFields(in);
high.readFields(in);
low.readFields(in);
volume.readFields(in);
adjClose.readFields(in);
}
@Override
public String toString() {
return "["+ company + "," + new SimpleDateFormat("yyyy-MM-dd").format(new Date(day.get())) + "]";
}
@Override
public int compareTo(DailyStock st) {
int cmp = company.compareTo(st.getCompany());
if (cmp != 0) {
return cmp;
}
return day.compareTo(st.getDay());
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result
+ ((adjClose == null) ? 0 : adjClose.hashCode());
result = prime * result + ((close == null) ? 0 : close.hashCode());
result = prime * result + ((company == null) ? 0 : company.hashCode());
result = prime * result + ((day == null) ? 0 : day.hashCode());
result = prime * result + ((high == null) ? 0 : high.hashCode());
result = prime * result + ((low == null) ? 0 : low.hashCode());
result = prime * result + ((opening == null) ? 0 : opening.hashCode());
result = prime * result + ((volume == null) ? 0 : volume.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DailyStock) {
DailyStock st = (DailyStock) obj;
return company.equals(st.getCompany()) && day.equals(st.getDay());
}
return false;
}
}
+42
View File
@@ -0,0 +1,42 @@
package bdp.stock;
import java.util.Arrays;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MinMaxCompany {
public static void runJob(String[] input, String output) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance();
job.setJarByClass(MinMaxCompany.class);
job.setReducerClass(CompanyMinMaxReducer.class);
job.setMapperClass(DailyMaxMapper.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DailyStock.class);
Path outputPath = new Path(output);
FileInputFormat.setInputPaths(job, StringUtils.join(input, ","));
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath, true);
job.waitForCompletion(true);
}
public static void main(String[] args) throws Exception {
runJob(Arrays.copyOfRange(args, 0, args.length - 1), args[args.length - 1]);
}
}