您的位置:首页 > 编程语言 > Java开发

HDPCD-Java-复习笔记(14)- lab

2017-10-16 10:56 316 查看
Java lab booklet



Lab: Performing a Map-Side Join

Your MapReduce job performs a join of 
a stock’s dividend amount and  the closing price of the stock on the same day the dividend was paid.

NYSE_dividends_A.csv

NYSE,AIT,2009-11-12,0.15
NYSE,AIT,2009-08-12,0.15
NYSE,AIT,2009-05-13,0.15
NYSE,AIT,2009-02-11,0.15

NYSE_daily_prices_A.csv

NYSE,ASP,1991-12-23,15.13,15.25,15.13,15.25,11200,3.25
NYSE,ASP,1991-12-20,15.13,15.13,15.13,15.13,24800,3.23
NYSE,AIT,2010-02-08,21.81,21.81,21.28,21.31,103800,21.31
NYSE,AIT,2010-02-05,21.70,21.92,21.39,21.81,141000,21.81
NYSE,AIT,2010-02-04,21.71,21.88,21.49,21.59,203400,21.59

package mapjoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MapSideJoin extends Configured implements Tool {
private static final String STOCK_SYMBOL = "stockSymbol";
public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Stock, StockPrices> {
private Map<Stock, Double> stocks = new HashMap<Stock, Double>();
private String stockSymbol;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get(STOCK_SYMBOL);
BufferedReader reader = new BufferedReader(new FileReader("NYSE_dividends_A.csv"));
String currentLine = "";
String[] words;
while((currentLine = reader.readLine()) != null){
words = StringUtils.split(currentLine, '\\', ',');
if (words[1].equals(stockSymbol)) {
stocks.put(new Stock(words[1], words[2]), Double.parseDouble(words[3]));
}
}
reader.close();
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), '\\', ',');
if (words[1].equals(stockSymbol)) {
Stock stock = new Stock(words[1], words[2]);
if (stocks.containsKey(stock)) {
StockPrices stockPrices = new StockPrices(stocks.get(stock), Double.parseDouble(words[6]));
context.write(stock, stockPrices);
}
}
}
}

@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "MapSideJoinJob");
job.setJarByClass(getClass());
//Distribute LocalResouces.
job.addCacheFile(new URI("dividends/NYSE_dividends_A.csv"));
Configuration conf = job.getConfiguration();
conf.set(STOCK_SYMBOL, args[0]);
//The output to be a comma-delimited file.
conf.set(TextOutputFormat.SEPERATOR, ",");

Path out = new Path("joinoutput");
out.getFileSystem(conf).delete(out,true);
FileInputFormat.setInputPaths(job, new Path("stocks"));
FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(MapSideJoinMapper.class);
job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Stock.class);
job.setMapOutputValueClass(StockPrices.class);

job.setNumReduceTasks(0);

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(),  new MapSideJoin(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(result);

}

}
package mapjoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock implements WritableComparable<Stock> {

private String symbol;
private String date;

public Stock(String symbol, String date) {
this.symbol = symbol;
this.date = date;
}

public Stock() {}

@Override
public boolean equals(Object obj) {
if(obj instanceof Stock) {
Stock other = (Stock) obj;
if(symbol.equals(other.symbol) && date.equals(other.date)) {
return true;
}
}
return false;
}

@Override
public int hashCode() {
return (symbol + date).hashCode();
}

@Override
public void readFields(DataInput in) throws IOException {
symbol = in.readUTF();
date = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(symbol);
out.writeUTF(date);
}

@Override
public int compareTo(Stock arg0) {
int response = this.symbol.compareTo(arg0.symbol);
if(response == 0) {
response = this.date.compareTo(arg0.date);
}
return response;
}

public String getSymbol() {
return symbol;
}

public void setSymbol(String symbol) {
this.symbol = symbol;
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

@Override
public String toString() {
return symbol + "," + date;
}

}
package mapjoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class StockPrices implements Writable {

private double dividend;
private double closingPrice;

public StockPrices() {}

public StockPrices(double dividend, double closingPrice) {
this.dividend = dividend;
this.closingPrice = closingPrice;
}

@Override
public void readFields(DataInput in) throws IOException {
dividend = in.readDouble();
closingPrice = in.readDouble();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeDouble(dividend);
out.writeDouble(closingPrice);
}

public double getDividend() {
return dividend;
}

public void setDividend(double dividend) {
this.dividend = dividend;
}

public double getClosingPrice() {
return closingPrice;
}

public void setClosingPrice(double closingPrice) {
this.closingPrice = closingPrice;
}

@Override
public String toString() {
return dividend + "," + closingPrice;
}
}


Lab: Using a Bloom Filter
Stock’s closing prices
only on the dates that a dividend price was announced for that stock.

This application consists of two MapReduce jobs: the first job will create the Bloom filter and save it in a file named filters/dividendfilter. The second job inputs the stock prices along with their
corresponding dividend prices, and outputs only those stock prices that have a dividend granted on the same date (which is about once every three months for most stocks).

package bloom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StockDividendFilter extends Configured implements Tool {
private static final String FILTER_FILE = "filters/dividendfilter";

private enum BloomCounters {
FALSE_POSITIVES;
}

private enum JoinData {
DIVIDENDS, STOCKS;
}

public static class BloomMapper extends Mapper<LongWritable, Text, NullWritable, BloomFilter> {
private String stockSymbol;
private NullWritable outputKey = NullWritable.get();
private BloomFilter outputValue;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
outputValue = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] words = StringUtils.split(value.toString(),'\\',',');
if(words[1].equals(stockSymbol)) {
Stock stock = new Stock(words[1], words[2]);
Key stockKey = new Key(stock.toString().getBytes());
outputValue.add(stockKey);
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(outputKey, outputValue);
}
}

public static class BloomReducer extends Reducer<NullWritable, BloomFilter, NullWritable, NullWritable> {

private BloomFilter allValues;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
allValues = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
}

@Override
protected void reduce(NullWritable key, Iterable<BloomFilter> values, Context context)
throws IOException, InterruptedException {
for (BloomFilter bloomFilter : values) {
allValues.or(bloomFilter);
}
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
Path path = new Path(FILTER_FILE);
FSDataOutputStream out = path.getFileSystem(conf).create(path);
allValues.write(out);
out.close();
}
}

public static class StockFilterMapper extends Mapper<LongWritable, Text, StockTaggedKey, DoubleWritable> {
private BloomFilter dividends;
private String stockSymbol;
private DoubleWritable outputValue = new DoubleWritable();
private Stock outputKey = new Stock();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
Path filterFile = new Path(FILTER_FILE);
stockSymbol = context.getConfiguration().get("stockSymbol");

//Initialize the dividends field
dividends = new BloomFilter(1000, 20, Hash.MURMUR_HASH);
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream in = fs.open(filterFile);
dividends.readFields(in);
in.close();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] words = StringUtils.split(value.toString(),'\\',',');
if(words[1].equals(stockSymbol)) {
outputKey.setSymbol(words[1]);
outputKey.setDate(words[2]);
//Instantiate a Bloom Key using outputKey, then check for membership in the Bloom filter
Key stockKey = new Key(outputKey.toString().getBytes());
if(dividends.membershipTest(stockKey)){
outputValue.set(Double.parseDouble(words[6]));
context.write(new StockTaggedKey(JoinData.STOCKS.ordinal(), outputKey), outputValue);
}
}
}
}

public static class DividendMapper extends Mapper<LongWritable, Text, StockTaggedKey, DoubleWritable> {
private String stockSymbol;
private DoubleWritable outputValue = new DoubleWritable();
Stock outputKey = new Stock();

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), '\\', ',');
if (words[1].equals(stockSymbol)) {
outputKey.setSymbol(words[1]);
outputKey.setDate(words[2]);
outputValue.set(Double.parseDouble(words[3]));
context.write(new StockTaggedKey(JoinData.DIVIDENDS.ordinal(),
outputKey), outputValue);
}
}
}

public static class StockFilterReducer extends
Reducer<StockTaggedKey, DoubleWritable, Text, DoubleWritable> {
private static final Logger LOG = LoggerFactory
.getLogger(StockFilterReducer.class);
private Text outputKey = new Text();

@Override
protected void reduce(StockTaggedKey key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
DoubleWritable dividend = null;

for (DoubleWritable value : values) {
// The dividend record (if any) should appear first. Only output the
// stock data if there's a matching dividend record. False positives
// from the bloom filter could have caused some extra stock records to
// be sent to the reducer
if (key.getTag() == JoinData.DIVIDENDS.ordinal()) {
// Copy the dividend so that the framework doesn't overwrite it the
// next time through the loop
dividend = new DoubleWritable(value.get());
}
else if (dividend != null) {
outputKey.set(key.getKey().toString());
context.write(outputKey, value);
}
}

if (dividend == null) {
LOG.warn("False positive detected for stock: {}", key.getKey()
.toString());
context.getCounter(BloomCounters.FALSE_POSITIVES).increment(1);
}
}

}

@Override
public int run(String[] args) throws Exception {
Job job1 = Job.getInstance(getConf(), "CreateBloomFilter");
job1.setJarByClass(getClass());
Configuration conf = job1.getConfiguration();
conf.set("stockSymbol", args[0]);

FileInputFormat.setInputPaths(job1, new Path("dividends"));

job1.setMapperClass(BloomMapper.class);
job1.setReducerClass(BloomReducer.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(NullOutputFormat.class);
job1.setMapOutputKeyClass(NullWritable.class);
job1.setMapOutputValueClass(BloomFilter.class);
job1.setOutputKeyClass(NullWritable.class);
job1.setOutputValueClass(NullWritable.class);
job1.setNumReduceTasks(1);

boolean job1success = job1.waitForCompletion(true);
if (!job1success) {
System.out.println("The CreateBloomFilter job failed!");
return -1;
}

Job job2 = Job.getInstance(conf, "FilterStocksJob");
job2.setJarByClass(getClass());
conf = job2.getConfiguration();

Path out = new Path("bloomoutput");
out.getFileSystem(conf).delete(out, true);
FileInputFormat.setInputPaths(job2, new Path("stocks"));
FileOutputFormat.setOutputPath(job2, out);

Path stocks = new Path("stocks");
Path dividends = new Path("dividends");
MultipleInputs.addInputPath(job2, stocks, TextInputFormat.class,
StockFilterMapper.class);
MultipleInputs.addInputPath(job2, dividends, TextInputFormat.class,
DividendMapper.class);
job2.setReducerClass(StockFilterReducer.class);

job2.setOutputFormatClass(TextOutputFormat.class);
job2.setMapOutputKeyClass(StockTaggedKey.class);
job2.setMapOutputValueClass(DoubleWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(DoubleWritable.class);

job2.setPartitionerClass(TaggedKeyHashPartitioner.class);
job2.setGroupingComparatorClass(StockTaggedKeyGroupingComparator.class);

boolean job2success = job2.waitForCompletion(true);
if (!job2success) {
System.out.println("The FilterStocksJob failed!");
return -1;
}
return 1;
}

public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(), new StockDividendFilter(),
args);
}
catch (Exception e) {
e.printStackTrace();
}
System.exit(result);

}

}
package bloom;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock implements WritableComparable<Stock> {

private String symbol;
private String date;
private static final String COMMA = ",";

public Stock() {}

public Stock(String symbol, String date) {
this.symbol = symbol;
this.date = date;
}

@Override
public boolean equals(Object obj) {
if(obj instanceof Stock) {
Stock other = (Stock) obj;
if(symbol.equals(other.symbol) && date.equals(other.date)) {
return true;
}
}
return false;
}

@Override
public int hashCode() {
return (symbol + date).hashCode();
}

@Override
public void readFields(DataInput in) throws IOException {
symbol = in.readUTF();
date = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(symbol);
out.writeUTF(date);
}

@Override
public int compareTo(Stock arg0) {
int response = this.symbol.compareTo(arg0.symbol);
if(response == 0) {
response = this.date.compareTo(arg0.date);
}
return response;
}

public String getSymbol() {
return symbol;
}

public void setSymbol(String symbol) {
this.symbol = symbol;
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(symbol).append(COMMA).append(date);
return sb.toString();
}
}
package bloom;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public abstract class TaggedKey<K extends WritableComparable<K>> implements
WritableComparable<TaggedKey<K>> {
private int tag = 0;

public TaggedKey() {
}

public TaggedKey(int tag) {
this.tag = tag;
}

public int getTag() {
return tag;
}

public abstract K getKey();

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(tag);
getKey().write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
tag = in.readInt();
getKey().readFields(in);
}

@Override
public int compareTo(TaggedKey<K> o) {
int result = getKey().compareTo(o.getKey());
return result != 0 ? result : (tag - o.tag);
}

@Override
public String toString() {
return String.format("%s: %s\n", tag, getKey());
}

}
package bloom;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public abstract class TaggedKeyGroupingComparator extends WritableComparator {
public TaggedKeyGroupingComparator(Class<? extends WritableComparable<?>> keyClass) {
super(keyClass, true);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public int compare(WritableComparable a, WritableComparable b) {
TaggedKey lhs = (TaggedKey) a;
TaggedKey rhs = (TaggedKey) b;
return lhs.getKey().compareTo(rhs.getKey());
}

}
package bloom;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class TaggedKeyHashPartitioner<K extends WritableComparable<K>, V>
extends Partitioner<TaggedKey<K>, V> {

HashPartitioner<K, V> partitioner = new HashPartitioner<K, V>();

@Override
public int getPartition(TaggedKey<K> key, V value, int numPartitions) {
return partitioner.getPartition(key.getKey(), value, numPartitions);
}
}
package bloom;

public class StockTaggedKey extends TaggedKey<Stock> {

protected Stock key;

public StockTaggedKey() {
key = new Stock();
}

public StockTaggedKey(int tag, Stock key) {
super(tag);
this.key = key;
}

@Override
public Stock getKey() {
return key;
}

}
package bloom;

public class StockTaggedKeyGroupingComparator extends TaggedKeyGroupingComparator {

public StockTaggedKeyGroupingComparator() {
super(StockTaggedKey.class);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdp