您的位置:首页 > 编程语言 > Java开发

HDPCD-Java-复习笔记(22)- lab

2017-10-21 10:24 423 查看
Java lab booklet

Lab: Defining an Oozie Workflow




package bloom;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class StockDividendFilter extends Configured implements Tool {
private static final String FILTER_FILE = "bloom/dividendfilter";

public static class BloomMapper extends Mapper<LongWritable, Text, IntWritable, BloomFilter> {
private final IntWritable ONE = new IntWritable(1);
Stock stock = new Stock();
private final String COMMA = ",";
private BloomFilter outputValue;
private String stockSymbol;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
outputValue = new BloomFilter(10000, 2, Hash.MURMUR_HASH);
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String [] words = value.toString().split(COMMA);
String currentSymbol = words[1];
if(stockSymbol.equals(currentSymbol)) {
stock.setSymbol(currentSymbol);
stock.setDate(words[2]);
outputValue.add(new Key(stock.toString().getBytes()));
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(ONE, outputValue);
}
}

public static class BloomReducer extends Reducer<IntWritable, BloomFilter, NullWritable, NullWritable> {
private BloomFilter allValues;

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
allValues = new BloomFilter(10000, 2, Hash.MURMUR_HASH);
}

@Override
protected void reduce(IntWritable key, Iterable<BloomFilter> values, Context context)
throws IOException, InterruptedException {
while(values.iterator().hasNext()) {
BloomFilter current = values.iterator().next();
allValues.or(current);
}
}

@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
Path path = new Path(FILTER_FILE);
FSDataOutputStream out = path.getFileSystem(conf).create(path);
allValues.write(out);
out.close();
}
}

public static class StockFilterMapper extends Mapper<LongWritable, Text, Stock, DoubleWritable> {
private BloomFilter dividends;
private Stock outputKey = new Stock();
private DoubleWritable outputValue = new DoubleWritable();
private String stockSymbol;
private final String COMMA = ",";

@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");

Configuration conf = context.getConfiguration();
Path path = new Path(FILTER_FILE);
FSDataInputStream in = path.getFileSystem(conf).open(path);
dividends = new BloomFilter(10000,2,Hash.MURMUR_HASH);
dividends.readFields(in);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] words = value.toString().split(COMMA);
String currentSymbol = words[1];
if(currentSymbol.equals(stockSymbol)) {
outputKey.setSymbol(currentSymbol);
outputKey.setDate(words[2]);
Key stockKey = new Key(outputKey.toString().getBytes());
if(dividends.membershipTest(stockKey)) {
outputValue.set(Double.parseDouble(words[6]));
context.write(outputKey, outputValue);
}
}
}
}

public static class StockFilterReducer extends Reducer<Stock, DoubleWritable, Text, DoubleWritable> {
private String stockSymbol = "";
private Text outputKey = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
stockSymbol = context.getConfiguration().get("stockSymbol");
}

@Override
protected void reduce(Stock key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
//Check for a false positive
if(!stockSymbol.equals(key.getSymbol())) {
System.out.println("False positive: " + key.getSymbol());
} else {
while(values.iterator().hasNext()) {
DoubleWritable closingPrice = values.iterator().next();
outputKey.set(key.toString());
context.write(outputKey, closingPrice);
}
}
}

}

@Override
public int run(String[] args) throws Exception {
Job job1 = Job.getInstance(getConf(), "CreateBloomFilter");
job1.setJarByClass(getClass());
Configuration conf = job1.getConfiguration();
conf.set("stockSymbol", args[0]);

FileInputFormat.setInputPaths(job1, new Path("dividends"));

job1.setMapperClass(BloomMapper.class);
job1.setReducerClass(BloomReducer.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(NullOutputFormat.class);
job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(BloomFilter.class);
job1.setOutputKeyClass(NullWritable.class);
job1.setOutputValueClass(NullWritable.class);
job1.setNumReduceTasks(1);

boolean job1success = job1.waitForCompletion(true);
if(!job1success) {
System.out.println("The CreateBloomFilter job failed!");
return -1;
}

Job job2 = Job.getInstance(conf, "FilterStocksJob");
job2.setJarByClass(getClass());
conf = job2.getConfiguration();

Path out = new Path("bloomoutput");
out.getFileSystem(conf).delete(out,true);
FileInputFormat.setInputPaths(job2, new Path("stocks"));
FileOutputFormat.setOutputPath(job2, out);

job2.setMapperClass(StockFilterMapper.class);
job2.setReducerClass(StockFilterReducer.class);
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
job2.setMapOutputKeyClass(Stock.class);
job2.setMapOutputValueClass(DoubleWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(DoubleWritable.class);

boolean job2success = job2.waitForCompletion(true);
if(!job2success) {
System.out.println("The FilterStocksJob failed!");
return -1;
}
return 1;
}

public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(),  new StockDividendFilter(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(result);

}

}
package bloom;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Stock implements WritableComparable<Stock> {

private String symbol;
private String date;
private static final String COMMA = ",";

@Override
public boolean equals(Object obj) {
if(obj instanceof Stock) {
Stock other = (Stock) obj;
if(symbol.equals(other.symbol) && date.equals(other.date)) {
return true;
}
}
return false;
}

@Override
public int hashCode() {
return (symbol + date).hashCode();
}

@Override
public void readFields(DataInput in) throws IOException {
symbol = in.readUTF();
date = in.readUTF();
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(symbol);
out.writeUTF(date);
}

@Override
public int compareTo(Stock arg0) {
int response = this.symbol.compareTo(arg0.symbol);
if(response == 0) {
response = this.date.compareTo(arg0.date);
}
return response;
}

public String getSymbol() {
return symbol;
}

public void setSymbol(String symbol) {
this.symbol = symbol;
}

public String getDate() {
return date;
}

public void setDate(String date) {
this.date = date;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(symbol).append(COMMA).append(date);
return sb.toString();
}
}
workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.2" name="dividendstockfilter-workflow">
<start to="build-bloomfilter" />
<action name="build-bloomfilter">
<map-reduce>
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/bloom/temp" />
</prepare>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>bloom.StockDividendFilter$BloomMapper</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>bloom.StockDividendFilter$BloomReducer</value>
</property>
<property>
<name>mapreduce.job.inputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat
</value>
</property>
<property>
<name>mapreduce.job.outputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
</value>
</property>
<property>
<name>mapreduce.map.output.key.class</name>
<value>org.apache.hadoop.io.IntWritable</value>
</property>
<property>
<name>mapreduce.map.output.value.class</name>
<value>org.apache.hadoop.util.bloom.BloomFilter</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.NullWritable</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>1</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>${nameNode}/user/${wf:user()}/bloom/dividends</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>${nameNode}/user/${wf:user()}/bloom/temp</value>
</property>
<property>
<name>stockSymbol</name>
<value>${stockSymbol}</value>
</property>
</configuration>
</map-reduce>
<ok to="filter-stocks" />
<error to="fail" />
</action>
<action name="filter-stocks">
<map-reduce>
<job-tracker>${resourceManager}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/bloom/bloomoutput" />
</prepare>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapreduce.job.map.class</name>
<value>bloom.StockDividendFilter$StockFilterMapper</value>
</property>
<property>
<name>mapreduce.job.reduce.class</name>
<value>bloom.StockDividendFilter$StockFilterReducer</value>
</property>
<property>
<name>mapreduce.job.inputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat
</value>
</property>
<property>
<name>mapreduce.job.outputformat.class</name>
<value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
</value>
</property>
<property>
<name>mapreduce.job.output.key.class</name>
<value>bloom.Stock</value>
</property>
<property>
<name>mapreduce.job.output.value.class</name>
<value>org.apache.hadoop.io.DoubleWritable</value>
</property>
<property>
<name>mapreduce.job.reduces</name>
<value>1</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>${nameNode}/user/${wf:user()}/bloom/bloomoutput</value>
</property>
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>${nameNode}/user/${wf:user()}/bloom/stocks</value>
</property>
<property>
<name>stockSymbol</name>
<value>${stockSymbol}</value>
</property>
</configuration>
</map-reduce>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Job failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end" />
</workflow-app>


job.properties

oozie.wf.application.path=hdfs://namenode:8020/user/root/bloom

#Hadoop yarn.resourcemanager.address 
resourceManager=resourcemanager:8050

#Hadoop fs.default.name
nameNode=hdfs://namenode:8020/

#Hadoop mapred.queue.name
queueName=default

stockSymbol=AIT

Set OOZIE_URL environment.

export OOZIE_URL=http://hiveserver:11000/oozie

Oozie provides a validation tool to verify that the workflow.xml file is well-formed and valid. 

oozie validate workflow.xml

You are going to upload this Oozie workflow into HDFS into a new folder named bloom. Any JAR files of an Oozie workflow need to go
in a directory named lib

# hadoop fs -mkdir bloom
# hadoop fs -mkdirbloom/lib
# hadoop fs -put oozie.jar bloom/lib/

# hadoop fs -put workflow.xml bloom/

# hadoop fs -mkdir bloom/dividends
# hadoop fs -put ~/java/labs/data/stock_dividends/NYSE_dividends_A.csvbloom/dividends

 # hadoop fs -mkdir bloom/stocks
# hadoop fs -put~/java/labs/data/stock_prices/NYSE_daily_prices_A.csv bloom/stocks
# hadoop fs -ls -R bloom

# oozie job -config job.properties -run

Browse http://hiveserver :11000 /oozie.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdp oozie