Hadoop 里MapReduce里 实现多个job任务 包含(迭代式、依赖式、链式)
2017-11-03 10:29
309 查看
一、迭代式,所谓的迭代式,下一个执行的Job任务以上一个Job的输出作为输入,最终得到想要的结果。 这里我只写关键的代码了[java] view plain copy Job job = new Job(new Configuration(),“test”); JobConf jobConf=(JobConf) job.getConfiguration(); jobConf.setJobName("hadoopJoinTask"); //设置job输入路径 FileInputFormat.setInputPaths(inputPath); //设置job输出的路径 FileOutputFormat.setOutputPath(jobConf, outPath); Job job2 = new Job(new Configuration(),“test2”); JobConf jobConf2=(JobConf) job2.getConfiguration(); jobConf2.setJobName("hadoopJoinTask"); //设置job2输入路径 job的输出路径 FileInputFormat.setInputPaths(outPath); //设置job2输出的路径 FileOutputFormat.setOutputPath(jobConf2, outPath2);
二、依赖式,工作中经常遇到这样的情况,比如job3需要等job1、job2、、、等执行完才能执行,因此job3是依赖于其他的job完成才能执行[java] view plain copy //hadoop2 查看hadoop源码 JobControl 发现有ControlledJob, ControlledJob里有依赖方法 addDependingJob Job job = new Job(new Configuration(),"job1"); Job job2 = new Job(new Configuration(),"job2"); ControlledJob controlledJob=new ControlledJob(job.getConfiguration()); //设置job controlledJob.setJob(job); ControlledJob controlledJob2=new ControlledJob(job2.getConfiguration()); controlledJob2.setJob(job2); //这里就是设置job依赖的重要方法了,依赖于<span style="font-size: 9pt; font-family: Menlo;">controlledJob</span><span style="font-size: 9pt; font-family: Menlo;"> </span> controlledJob.addDependingJob(controlledJob); JobControl jc=new JobControl("jc"); jc.addJob(controlledJob); jc.addJob(controlledJob2); //由于JobControl实现了Runnable 接口,而Runnable接口只有运行方法,没有结束方法,因此需要一个线程来辅助 Thread jcThread = new Thread(jc); jcThread.start(); while(true){ //当job池里所有的job完成后,执行 下一步操作 if(jc.allFinished()){ System.out.println(jc.getSuccessfulJobList()); jc.stop(); } //获取执行失败的job列表 if(jc.getFailedJobList().size() > 0){ System.out.println(jc.getFailedJobList()); jc.stop(); } } 三、链式[java] view plain copy Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("ChianJOb"); // 在ChainMapper里面添加Map1 Configuration map1conf = new Configuration(false); ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); // 在ChainReduce中加入Reducer,Map2; Configuration reduceConf = new Configuration(false); ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); Configuration map2Conf = new Configuration(); ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class, Text.class, Text.class, true, <span style="font-family: Menlo; font-size: 9pt;">map2Conf</span><span style="font-size: 9pt; font-family: Menlo;">);</span>
相关文章推荐
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- Hadoop学习笔记(老版本,YARN之前),MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- hadoop用MultipleInputs/MultiInputFormat实现一个mapreduce job中读取不同格式的文件
- Hadoop学习笔记:MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- Hadoop好友推荐系统-原始数据去重操作(包含MapReduce任务监控)
- hadoop job -kill 与 yarn application -kii(作业卡了或作业重复提交或MapReduce任务运行到running job卡住)
- Hadoop学习笔记,MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- Python+Hadoop Streaming实现MapReduce任务
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- HadoopMapReduce -Map-Reduce具体实现详解
- Hadoop系列(5):MapReduce实现PageRank
- activiti实现任务超时,创建job不执行的原因
- hive实现txt数据导入,理解hadoop中hdfs、mapreduce
- MapReduce,组合式,迭代式,链式
- Hadoop MapReduce Job提交后的交互日志
- Hbase使用MultiTableOutputFormat实现多表输出MapReduce job
- 基于mapreduce的 Hadoop join 实现分析(二)