Hadoop 中的 MapReduce链接作业之预处理和后处理阶段的链接
2013-06-17 17:25
405 查看
package com.test; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.ChainMapper; import org.apache.hadoop.mapred.lib.ChainReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * MapReduce链接作业之预处理和后处理阶段的链接 * * @author Administrator * */ public class MyJobLink extends Configured implements Tool { public static class Reduce extends MapReduceBase implements Reducer<LongWritable, Text, Text, Text> { public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { /** * nothing */ output.collect(new Text("1"), new Text("1")); } } public static class Map1 extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { /** * nothing */ output.collect(value, new Text(key.toString())); } } public static class Map2 extends MapReduceBase implements Mapper<Text, Text, LongWritable, Text> { public void map(Text key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { /** * nothing */ output.collect(new LongWritable(Long.valueOf(value.toString())), key); } } public static class Map3 extends MapReduceBase implements Mapper<Text, Text, LongWritable, Text> { public void map(Text key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { /** * nothing */ output.collect(new LongWritable(Long.valueOf("1")), key); } } public static class Map4 extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> { public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { /** * nothing */ output.collect(new LongWritable(Long.valueOf("1")), new Text("1")); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf); job.setJobName("ChainJob"); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); /** * 在作业中添加Map1阶段 * 使用ChainMapper.addMapper()添加位于Reduce之前的所有步骤 * * ChainMapper.addMapper(JobConf job, * Class<? extends Mapper<LongWritable, Text, Text, Text>> klass, * Class<? extends LongWritable> inputKeyClass, * Class<? extends Text> inputValueClass, * Class<? extends Text> outputKeyClass, * Class<? extends Text> outputValueClass, * boolean byValue, * JobConf mapperConf) * 该方法有8个参数,第一个和最后一个分别为全局和本地的JobConf对象. * 第二个参数(klass)是Mapper类,负责数据处理. * 余下4个参数inputKeyClass,inputValueClass, outputKeyClass, outputValueClass * 是这个Mapper类中的输入/输出类的类型 * * 稍微解释一下byValue这个参数.在标准的Mapper模型中, * 键/值对的输出在序列化之后写入磁盘(键和值实现为Writable使得他们能够被复制和序列化), * 等待被洗牌到一个可能完全不同的节点上.形式上认为这个过程采用的是值传递(passed by value) * 发送的是键值对的副本. * 在目前的情况下我们可以将一个Mapper与另一个相链接,在相同的JVM线程中一起执行. * 因此,键/值对的发送有可能采用引用传递(passed by reference), * 初始Mapper的输出放到内存中,后续的Mapper直接引用相同的内存位置. * 当Mapper1调用OutputCollector.collect(K k,V v)时,对象k和v直接传递给Map2的map()方法. * mapper之间可能有大量的数据需要传递,避免去复制这些数据可以让性能得以提高. * 但是,这样会违背Hadoop中MapReduceApi的一个更为微妙的"约定",即对OutputCollector.collect(K k,V v) * 的调用一定不会改变k和v的内容. * Map1调用OutputCollector.collect(K k,V v)之后,可以继续使用对象k和v,并完全相信他们的值会保持不变. * 但如果我们将这些对象通过引用传递给Map2,接下来Map2可能会改变他们,这就违反了API的"约定". * 如果你确信Map1的map()方法在调用OutputCollector.collect(K k,V v)之后不再使用k和v的内容, * 或者Map2并不改变k和v的在其上的输入值,你可以通过设定byValue为false来获得一定的性能提升. * 如果你对Mapper的内部代码不太了解,安全起见最好设byValue为true,依旧采用值传递模式, * 确保mapper会按预期的方式工作. * */ JobConf map1Conf = new JobConf(false); ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1Conf); /** * 在作业中添加Map2阶段 * 使用ChainMapper.addMapper()添加位于Reduce之前的所有步骤 */ JobConf map2Conf = new JobConf(false); ChainMapper.addMapper(job, Map2.class, Text.class, Text.class, LongWritable.class, Text.class, true, map2Conf); /** * 在作业中添加Reduce阶段 * 使用静态的ChainReducer.setReducer()方法设置reducer */ JobConf reduceConf = new JobConf(false); ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reduceConf); /** * 在作业中添加Map3阶段 * 使用ChainReducer.addMapper()添加reducer后续的步骤 */ JobConf map3Conf = new JobConf(false); ChainReducer.addMapper(job, Map3.class, Text.class, Text.class, LongWritable.class, Text.class, true, map3Conf); /** * 在作业中添加Map4阶段 * 使用ChainReducer.addMapper()添加reducer后续的步骤 */ JobConf map4Conf = new JobConf(false); ChainReducer.addMapper(job, Map4.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, map4Conf); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { final String inputPath = "/home/dev/hadooptest/mapin/cite"; final String outputPath = "/home/dev/hadooptest/mapin/cite/out"; String[] paths = { inputPath, outputPath }; /** * Driver中的main函数->ToolRunner中的run函数->Too接口中的run函数-> * Driver中覆盖函数处理参数->Driver中核心函数启动job(合并为一个方法,重写了接口Tool的run方法) */ int res = ToolRunner.run(new Configuration(), new MyJobLink(), paths); System.exit(res); } }
相关文章推荐
- Hadoop 中的 MapReduce链接作业之预处理和后处理阶段的链接
- Hadoop 实战之MapReduce链接作业之预处理
- Hadoop 实战之MapReduce链接作业之预处理
- MapReduce链接作业之预处理
- 预处理和后处理阶段的链接
- hadoop作业调优参数整理及原理(整个mapreduce运行流程都讲的清楚,一步一步优化)
- hadoop之MapReduce作业的生命周期
- Intellij IDEA远程向hadoop集群提交mapreduce作业
- hadoop MapReduce - 从作业、任务(task)、管理员角度调优
- Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪
- hadoop运行mapreduce作业无法连接0.0.0.0/0.0.0.0:10020
- 从Hadoop框架与MapReduce模式中谈海量数据处理(含淘宝技术架构)
- 海量数据处理之从Hadoop框架与MapReduce模式中谈海量数据处理(淘宝技术架构)
- 从Hadoop框架与MapReduce模式中谈海量数据处理
- MapReduce多个作业协调处理
- hadoop之mapreduce编程实例(系统日志初步清洗过滤处理)
- Hadoop实战之链接MapReduce Job
- Hadoop 高级程序设计(四)---组合式的MapReduce作业
- 一个MapReuce作业的从开始到结束--第6章Hadoop以Jar包的方式执行MapReduce任务
- Hadoop-Mapreduce map—>reduce阶段图解