Hadoop 高级程序设计(四)---组合式的MapReduce作业
2014-09-25 15:33
295 查看
在实际的应用中,很多的复杂任务都是不止一趟的mapreduce任务,需要查分成多个简单的mapreduce子任务去完后。
(1)迭代的计算任务。
(2)顺序组合MapReduce作业
(3)具有依赖关系的组合式mapreduce作业
(4)mapreduce前处理和后处理步骤的链式执行
迭代的计算任务:
PageRank算法。
顺序组合:
多个mapreduce子任务的逐一执行:
mapreduce1->mapreduce2->mapreduce3.....
需要注意的是,任务1的输出目录,将作为任务2的输入目录,任务2的输出目录将作为任务3的输入目录,.....。
Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
job.setJarByClass(job1.class);
FileInputFormat.addInputPath(job1,inputpath1);
FileOutputFormat.setOutputPath(job, outputpath1);
Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,"job2");
job.setJarByClass(job2.class);
FileInputFormat.addInputPath(job2,outputpath1);
FileOutputFormat.setOutputPath(job2, outputpath2);
Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,"job3");
job.setJarByClass(job3.class);
FileInputFormat.addInputPath(job3,outputpath2);
FileOutputFormat.setOutputPath(job3, outputpath3);
(3)任务存在着依赖关系:
job除了维护子任务的配置信息之外,还可以维护任务之间的依赖关系,可以把所有任务都加入到JobControl中,在JobControl的run方法执行。
Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,"job2");
Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,"job3");
job3.addDependingJob(job1);
job3.addDependingJob(job2);
JobControl jc = new JobControl("job123");
jc.addJob(job1);
jc.addJob(job2);
jc.addJob(job3);
jc.run();4)mapreduce前处理和后处理步骤的链式执行
hadoop提供了链式的mapper和链式的reducer,ChainMapper允许在一个map任务下添加多个map任务,而chainreducer可以允许在一个reducer处理之后,继续使用多个map任务,完成后续的处理。
Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Configuration map1conf = new Configuration(false);
/**
* addMapper(主作业,加入的mapclass,map输入class,map输入键值class,map输出键class,map输出键值class,
* true,配置对象)
*
* */
ChainMapper.addMapper(job1,Map1.class,LongWritable.class,Text.class,
Text.class,Text.class,true,map1conf);
Configuration map2conf = new Configuration(false);
//根据上一个map输出class作为输入
ChainMapper.addMapper(job1,Map2.class,Text.class,Text.class,
LongWritable.class,Text.class,true,map2conf);
Configuration reduceconf = new Configuration(false);
/**
* setReducer(主作业,加入的reduceclass,map输入class,map输入键值class,map输出键class,map输出键值class,
* true,配置对象)
*
* */
ChainMapper.setReducer(job1,reduce.class,LongWritable.class,Text.class,
Text.class,Text.class,true,reduceconf);
//根据上一个map输出class作为输入
Configuration map3conf = new Configuration(false);
ChainMapper.addMapper(job1,map3.class,Text.class,Text.class,
LongWritable.class,Text.class,true,map3conf);
Configuration map4conf = new Configuration(false);
ChainMapper.addMapper(job1,map4.class,LongWritable.class,Text.class,
LongWritable.class,Text.class,true,map4conf);
(1)迭代的计算任务。
(2)顺序组合MapReduce作业
(3)具有依赖关系的组合式mapreduce作业
(4)mapreduce前处理和后处理步骤的链式执行
迭代的计算任务:
PageRank算法。
顺序组合:
多个mapreduce子任务的逐一执行:
mapreduce1->mapreduce2->mapreduce3.....
需要注意的是,任务1的输出目录,将作为任务2的输入目录,任务2的输出目录将作为任务3的输入目录,.....。
Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
job.setJarByClass(job1.class);
FileInputFormat.addInputPath(job1,inputpath1);
FileOutputFormat.setOutputPath(job, outputpath1);
Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,"job2");
job.setJarByClass(job2.class);
FileInputFormat.addInputPath(job2,outputpath1);
FileOutputFormat.setOutputPath(job2, outputpath2);
Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,"job3");
job.setJarByClass(job3.class);
FileInputFormat.addInputPath(job3,outputpath2);
FileOutputFormat.setOutputPath(job3, outputpath3);
(3)任务存在着依赖关系:
job除了维护子任务的配置信息之外,还可以维护任务之间的依赖关系,可以把所有任务都加入到JobControl中,在JobControl的run方法执行。
Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,"job2");
Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,"job3");
job3.addDependingJob(job1);
job3.addDependingJob(job2);
JobControl jc = new JobControl("job123");
jc.addJob(job1);
jc.addJob(job2);
jc.addJob(job3);
jc.run();4)mapreduce前处理和后处理步骤的链式执行
hadoop提供了链式的mapper和链式的reducer,ChainMapper允许在一个map任务下添加多个map任务,而chainreducer可以允许在一个reducer处理之后,继续使用多个map任务,完成后续的处理。
Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
Configuration map1conf = new Configuration(false);
/**
* addMapper(主作业,加入的mapclass,map输入class,map输入键值class,map输出键class,map输出键值class,
* true,配置对象)
*
* */
ChainMapper.addMapper(job1,Map1.class,LongWritable.class,Text.class,
Text.class,Text.class,true,map1conf);
Configuration map2conf = new Configuration(false);
//根据上一个map输出class作为输入
ChainMapper.addMapper(job1,Map2.class,Text.class,Text.class,
LongWritable.class,Text.class,true,map2conf);
Configuration reduceconf = new Configuration(false);
/**
* setReducer(主作业,加入的reduceclass,map输入class,map输入键值class,map输出键class,map输出键值class,
* true,配置对象)
*
* */
ChainMapper.setReducer(job1,reduce.class,LongWritable.class,Text.class,
Text.class,Text.class,true,reduceconf);
//根据上一个map输出class作为输入
Configuration map3conf = new Configuration(false);
ChainMapper.addMapper(job1,map3.class,Text.class,Text.class,
LongWritable.class,Text.class,true,map3conf);
Configuration map4conf = new Configuration(false);
ChainMapper.addMapper(job1,map4.class,LongWritable.class,Text.class,
LongWritable.class,Text.class,true,map4conf);
相关文章推荐
- Hadoop 中的 MapReduce链接作业之预处理和后处理阶段的链接
- [Hadoop] hadoop的mapreduce作业中经常出现Java heap space
- Hadoop for .NET Developers(十三):实施更复杂的MapReduce作业
- 精通HADOOP(九) - MAPREDUCE任务的基础知识 - 执行作业
- 组合式MapReduce作业的执行
- hadoop作业调优参数整理及原理(整个mapreduce运行流程都讲的清楚,一步一步优化)
- Hadoop源码分析--MapReduce作业(job)提交源码跟踪
- Hadoop MapReduce之作业初始化
- Hadoop MapReduce作业提交与初始化过程分析
- Hadoop for .NET Developers(十二):实现简单的MapReduce作业
- 高级程序设计语言2的作业有点难
- hadoop job -kill 与 yarn application -kii(作业卡了或作业重复提交或MapReduce任务运行到running job卡住)
- Hadoop 中的 MapReduce链接作业之预处理和后处理阶段的链接
- Hadoop-2.4.1源码分析--MapReduce作业切片(Split)过程
- hadoop的mapreduce作业中经常出现Java heap space解决方案
- Hadoop MapReduce作业的生命周期
- hadoop的mapreduce作业中出现Java heap space,你认为该如何解决?
- 如果遇到Hadoop集群正常,MapReduce作业运行出现错误,如何来查看作业运行日志(图文详解)
- Hadoop MapReduce执行框架作业调度方法 组件和执行流程
- hadoop MapReduce - 从作业、任务(task)、管理员角度调优