HDPCD-Java-复习笔记(22)- lab
2017-10-21 10:24
423 查看
Java lab booklet
Lab: Defining an Oozie Workflow
job.properties
oozie.wf.application.path=hdfs://namenode:8020/user/root/bloom
#Hadoop yarn.resourcemanager.address
resourceManager=resourcemanager:8050
#Hadoop fs.default.name
nameNode=hdfs://namenode:8020/
#Hadoop mapred.queue.name
queueName=default
stockSymbol=AIT
Set OOZIE_URL environment.
export OOZIE_URL=http://hiveserver:11000/oozie
Oozie provides a validation tool to verify that the workflow.xml file is well-formed and valid.
oozie validate workflow.xml
You are going to upload this Oozie workflow into HDFS into a new folder named bloom. Any JAR files of an Oozie workflow need to go
in a directory named lib
# hadoop fs -mkdir bloom
# hadoop fs -mkdirbloom/lib
# hadoop fs -put oozie.jar bloom/lib/
# hadoop fs -put workflow.xml bloom/
# hadoop fs -mkdir bloom/dividends
# hadoop fs -put ~/java/labs/data/stock_dividends/NYSE_dividends_A.csvbloom/dividends
# hadoop fs -mkdir bloom/stocks
# hadoop fs -put~/java/labs/data/stock_prices/NYSE_daily_prices_A.csv bloom/stocks
# hadoop fs -ls -R bloom
# oozie job -config job.properties -run
Browse http://hiveserver :11000 /oozie.
Lab: Defining an Oozie Workflow
package bloom; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.hadoop.util.hash.Hash; public class StockDividendFilter extends Configured implements Tool { private static final String FILTER_FILE = "bloom/dividendfilter"; public static class BloomMapper extends Mapper<LongWritable, Text, IntWritable, BloomFilter> { private final IntWritable ONE = new IntWritable(1); Stock stock = new Stock(); private final String COMMA = ","; private BloomFilter outputValue; private String stockSymbol; @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get("stockSymbol"); outputValue = new BloomFilter(10000, 2, Hash.MURMUR_HASH); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] words = value.toString().split(COMMA); String currentSymbol = words[1]; if(stockSymbol.equals(currentSymbol)) { stock.setSymbol(currentSymbol); stock.setDate(words[2]); outputValue.add(new Key(stock.toString().getBytes())); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(ONE, outputValue); } } public static class BloomReducer extends Reducer<IntWritable, BloomFilter, NullWritable, NullWritable> { private BloomFilter allValues; @Override protected void setup(Context context) throws IOException, InterruptedException { allValues = new BloomFilter(10000, 2, Hash.MURMUR_HASH); } @Override protected void reduce(IntWritable key, Iterable<BloomFilter> values, Context context) throws IOException, InterruptedException { while(values.iterator().hasNext()) { BloomFilter current = values.iterator().next(); allValues.or(current); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); Path path = new Path(FILTER_FILE); FSDataOutputStream out = path.getFileSystem(conf).create(path); allValues.write(out); out.close(); } } public static class StockFilterMapper extends Mapper<LongWritable, Text, Stock, DoubleWritable> { private BloomFilter dividends; private Stock outputKey = new Stock(); private DoubleWritable outputValue = new DoubleWritable(); private String stockSymbol; private final String COMMA = ","; @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get("stockSymbol"); Configuration conf = context.getConfiguration(); Path path = new Path(FILTER_FILE); FSDataInputStream in = path.getFileSystem(conf).open(path); dividends = new BloomFilter(10000,2,Hash.MURMUR_HASH); dividends.readFields(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] words = value.toString().split(COMMA); String currentSymbol = words[1]; if(currentSymbol.equals(stockSymbol)) { outputKey.setSymbol(currentSymbol); outputKey.setDate(words[2]); Key stockKey = new Key(outputKey.toString().getBytes()); if(dividends.membershipTest(stockKey)) { outputValue.set(Double.parseDouble(words[6])); context.write(outputKey, outputValue); } } } } public static class StockFilterReducer extends Reducer<Stock, DoubleWritable, Text, DoubleWritable> { private String stockSymbol = ""; private Text outputKey = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { stockSymbol = context.getConfiguration().get("stockSymbol"); } @Override protected void reduce(Stock key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { //Check for a false positive if(!stockSymbol.equals(key.getSymbol())) { System.out.println("False positive: " + key.getSymbol()); } else { while(values.iterator().hasNext()) { DoubleWritable closingPrice = values.iterator().next(); outputKey.set(key.toString()); context.write(outputKey, closingPrice); } } } } @Override public int run(String[] args) throws Exception { Job job1 = Job.getInstance(getConf(), "CreateBloomFilter"); job1.setJarByClass(getClass()); Configuration conf = job1.getConfiguration(); conf.set("stockSymbol", args[0]); FileInputFormat.setInputPaths(job1, new Path("dividends")); job1.setMapperClass(BloomMapper.class); job1.setReducerClass(BloomReducer.class); job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(NullOutputFormat.class); job1.setMapOutputKeyClass(IntWritable.class); job1.setMapOutputValueClass(BloomFilter.class); job1.setOutputKeyClass(NullWritable.class); job1.setOutputValueClass(NullWritable.class); job1.setNumReduceTasks(1); boolean job1success = job1.waitForCompletion(true); if(!job1success) { System.out.println("The CreateBloomFilter job failed!"); return -1; } Job job2 = Job.getInstance(conf, "FilterStocksJob"); job2.setJarByClass(getClass()); conf = job2.getConfiguration(); Path out = new Path("bloomoutput"); out.getFileSystem(conf).delete(out,true); FileInputFormat.setInputPaths(job2, new Path("stocks")); FileOutputFormat.setOutputPath(job2, out); job2.setMapperClass(StockFilterMapper.class); job2.setReducerClass(StockFilterReducer.class); job2.setInputFormatClass(TextInputFormat.class); job2.setOutputFormatClass(TextOutputFormat.class); job2.setMapOutputKeyClass(Stock.class); job2.setMapOutputValueClass(DoubleWritable.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(DoubleWritable.class); boolean job2success = job2.waitForCompletion(true); if(!job2success) { System.out.println("The FilterStocksJob failed!"); return -1; } return 1; } public static void main(String[] args) { int result = 0; try { result = ToolRunner.run(new Configuration(), new StockDividendFilter(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } }
package bloom; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Stock implements WritableComparable<Stock> { private String symbol; private String date; private static final String COMMA = ","; @Override public boolean equals(Object obj) { if(obj instanceof Stock) { Stock other = (Stock) obj; if(symbol.equals(other.symbol) && date.equals(other.date)) { return true; } } return false; } @Override public int hashCode() { return (symbol + date).hashCode(); } @Override public void readFields(DataInput in) throws IOException { symbol = in.readUTF(); date = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(symbol); out.writeUTF(date); } @Override public int compareTo(Stock arg0) { int response = this.symbol.compareTo(arg0.symbol); if(response == 0) { response = this.date.compareTo(arg0.date); } return response; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(symbol).append(COMMA).append(date); return sb.toString(); } }workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="dividendstockfilter-workflow"> <start to="build-bloomfilter" /> <action name="build-bloomfilter"> <map-reduce> <job-tracker>${resourceManager}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/bloom/temp" /> </prepare> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapreduce.job.map.class</name> <value>bloom.StockDividendFilter$BloomMapper</value> </property> <property> <name>mapreduce.job.reduce.class</name> <value>bloom.StockDividendFilter$BloomReducer</value> </property> <property> <name>mapreduce.job.inputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat </value> </property> <property> <name>mapreduce.job.outputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat </value> </property> <property> <name>mapreduce.map.output.key.class</name> <value>org.apache.hadoop.io.IntWritable</value> </property> <property> <name>mapreduce.map.output.value.class</name> <value>org.apache.hadoop.util.bloom.BloomFilter</value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>org.apache.hadoop.io.NullWritable</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.NullWritable</value> </property> <property> <name>mapreduce.job.reduces</name> <value>1</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/dividends</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/temp</value> </property> <property> <name>stockSymbol</name> <value>${stockSymbol}</value> </property> </configuration> </map-reduce> <ok to="filter-stocks" /> <error to="fail" /> </action> <action name="filter-stocks"> <map-reduce> <job-tracker>${resourceManager}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/bloom/bloomoutput" /> </prepare> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> <property> <name>mapred.mapper.new-api</name> <value>true</value> </property> <property> <name>mapred.reducer.new-api</name> <value>true</value> </property> <property> <name>mapreduce.job.map.class</name> <value>bloom.StockDividendFilter$StockFilterMapper</value> </property> <property> <name>mapreduce.job.reduce.class</name> <value>bloom.StockDividendFilter$StockFilterReducer</value> </property> <property> <name>mapreduce.job.inputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.input.TextInputFormat </value> </property> <property> <name>mapreduce.job.outputformat.class</name> <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat </value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>bloom.Stock</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.DoubleWritable</value> </property> <property> <name>mapreduce.job.reduces</name> <value>1</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/bloomoutput</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>${nameNode}/user/${wf:user()}/bloom/stocks</value> </property> <property> <name>stockSymbol</name> <value>${stockSymbol}</value> </property> </configuration> </map-reduce> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
job.properties
oozie.wf.application.path=hdfs://namenode:8020/user/root/bloom
#Hadoop yarn.resourcemanager.address
resourceManager=resourcemanager:8050
#Hadoop fs.default.name
nameNode=hdfs://namenode:8020/
#Hadoop mapred.queue.name
queueName=default
stockSymbol=AIT
Set OOZIE_URL environment.
export OOZIE_URL=http://hiveserver:11000/oozie
Oozie provides a validation tool to verify that the workflow.xml file is well-formed and valid.
oozie validate workflow.xml
You are going to upload this Oozie workflow into HDFS into a new folder named bloom. Any JAR files of an Oozie workflow need to go
in a directory named lib
# hadoop fs -mkdir bloom
# hadoop fs -mkdirbloom/lib
# hadoop fs -put oozie.jar bloom/lib/
# hadoop fs -put workflow.xml bloom/
# hadoop fs -mkdir bloom/dividends
# hadoop fs -put ~/java/labs/data/stock_dividends/NYSE_dividends_A.csvbloom/dividends
# hadoop fs -mkdir bloom/stocks
# hadoop fs -put~/java/labs/data/stock_prices/NYSE_daily_prices_A.csv bloom/stocks
# hadoop fs -ls -R bloom
# oozie job -config job.properties -run
Browse http://hiveserver :11000 /oozie.
相关文章推荐
- HDPCD-Java-复习笔记(10)-lab
- HDPCD-Java-复习笔记(9)-lab
- HDPCD-Java-复习笔记(21)- lab
- HDPCD-Java-复习笔记(13)- lab
- HDPCD-Java-复习笔记(14)- lab
- HDPCD-Java-复习笔记(18) - lab
- HDPCD-Java-复习笔记(23)- lab
- HDPCD-Java-复习笔记(3)-lab
- HDPCD-Java-复习笔记(7)- lab
- HDPCD-Java-复习笔记(17) - lab
- HDPCD-Java-复习笔记(8)- lab
- Java基础笔记的复习与整理--集合框…
- Java面向对象复习笔记
- Java线程复习笔记
- JAVA基础笔记(复习)- 继承
- Java复习笔记[6] —— 文件I/O
- 复习 Core Java Chap14 多线程 笔记
- Java复习笔记2【next()和nextLine()】
- Java面试复习笔记
- java复习笔记4:封装,继承,多态