您的位置:首页 > 运维架构

Hadoop 高级程序设计(四)---组合式的MapReduce作业

2014-09-25 15:33 295 查看
在实际的应用中,很多的复杂任务都是不止一趟的mapreduce任务,需要查分成多个简单的mapreduce子任务去完后。

(1)迭代的计算任务。

(2)顺序组合MapReduce作业

(3)具有依赖关系的组合式mapreduce作业

(4)mapreduce前处理和后处理步骤的链式执行

迭代的计算任务:

PageRank算法。

顺序组合:

多个mapreduce子任务的逐一执行:

mapreduce1->mapreduce2->mapreduce3.....

需要注意的是,任务1的输出目录,将作为任务2的输入目录,任务2的输出目录将作为任务3的输入目录,.....。

Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
job.setJarByClass(job1.class);
FileInputFormat.addInputPath(job1,inputpath1);
FileOutputFormat.setOutputPath(job, outputpath1);

Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,"job2");
job.setJarByClass(job2.class);
FileInputFormat.addInputPath(job2,outputpath1);
FileOutputFormat.setOutputPath(job2, outputpath2);

Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,"job3");
job.setJarByClass(job3.class);
FileInputFormat.addInputPath(job3,outputpath2);
FileOutputFormat.setOutputPath(job3, outputpath3);
(3)任务存在着依赖关系:
job除了维护子任务的配置信息之外,还可以维护任务之间的依赖关系,可以把所有任务都加入到JobControl中,在JobControl的run方法执行。

Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");

Configuration conf2 = new Configuration();
Job job2 = new Job(conf2,"job2");

Configuration conf3 = new Configuration();
Job job3 = new Job(conf3,"job3");

job3.addDependingJob(job1);
job3.addDependingJob(job2);

JobControl jc = new JobControl("job123");
jc.addJob(job1);
jc.addJob(job2);
jc.addJob(job3);
jc.run();4)mapreduce前处理和后处理步骤的链式执行
hadoop提供了链式的mapper和链式的reducer,ChainMapper允许在一个map任务下添加多个map任务,而chainreducer可以允许在一个reducer处理之后,继续使用多个map任务,完成后续的处理。

Configuration conf = new Configuration();
Job job1 = new Job(conf,"job1");
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

Configuration map1conf = new Configuration(false);
/**
* addMapper(主作业,加入的mapclass,map输入class,map输入键值class,map输出键class,map输出键值class,
* true,配置对象)
*
* */
ChainMapper.addMapper(job1,Map1.class,LongWritable.class,Text.class,
Text.class,Text.class,true,map1conf);
Configuration map2conf = new Configuration(false);
//根据上一个map输出class作为输入
ChainMapper.addMapper(job1,Map2.class,Text.class,Text.class,
LongWritable.class,Text.class,true,map2conf);

Configuration reduceconf = new Configuration(false);
/**
* setReducer(主作业,加入的reduceclass,map输入class,map输入键值class,map输出键class,map输出键值class,
* true,配置对象)
*
* */
ChainMapper.setReducer(job1,reduce.class,LongWritable.class,Text.class,
Text.class,Text.class,true,reduceconf);
//根据上一个map输出class作为输入
Configuration map3conf = new Configuration(false);
ChainMapper.addMapper(job1,map3.class,Text.class,Text.class,
LongWritable.class,Text.class,true,map3conf);

Configuration map4conf = new Configuration(false);
ChainMapper.addMapper(job1,map4.class,LongWritable.class,Text.class,
LongWritable.class,Text.class,true,map4conf);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Hadoop 组合式作业