您的位置:首页 > 运维架构

Hadoop 里MapReduce里 实现多个job任务 包含(迭代式、依赖式、链式)

2017-11-03 10:29 309 查看
一、迭代式,所谓的迭代式,下一个执行的Job任务以上一个Job的输出作为输入,最终得到想要的结果。 这里我只写关键的代码了
[java] view plain copy Job job = new Job(new Configuration(),“test”);    JobConf jobConf=(JobConf) job.getConfiguration();  jobConf.setJobName("hadoopJoinTask");    //设置job输入路径  FileInputFormat.setInputPaths(inputPath);  //设置job输出的路径  FileOutputFormat.setOutputPath(jobConf, outPath);    Job job2 = new Job(new Configuration(),“test2”);    JobConf jobConf2=(JobConf) job2.getConfiguration();  jobConf2.setJobName("hadoopJoinTask");    //设置job2输入路径  job的输出路径  FileInputFormat.setInputPaths(outPath);  //设置job2输出的路径  FileOutputFormat.setOutputPath(jobConf2, outPath2);  
二、依赖式,工作中经常遇到这样的情况,比如job3需要等job1、job2、、、等执行完才能执行,因此job3是依赖于其他的job完成才能执行
[java] view plain copy //hadoop2  查看hadoop源码 JobControl 发现有ControlledJob,  ControlledJob里有依赖方法  addDependingJob            Job job = new Job(new Configuration(),"job1");          Job job2 = new Job(new Configuration(),"job2");            ControlledJob controlledJob=new ControlledJob(job.getConfiguration());            //设置job          controlledJob.setJob(job);            ControlledJob controlledJob2=new ControlledJob(job2.getConfiguration());          controlledJob2.setJob(job2);          //这里就是设置job依赖的重要方法了,依赖于<span style="font-size: 9pt; font-family: Menlo;">controlledJob</span><span style="font-size: 9pt; font-family: Menlo;">  </span>          controlledJob.addDependingJob(controlledJob);            JobControl jc=new JobControl("jc");          jc.addJob(controlledJob);          jc.addJob(controlledJob2);          //由于JobControl实现了Runnable 接口,而Runnable接口只有运行方法,没有结束方法,因此需要一个线程来辅助            Thread jcThread = new Thread(jc);          jcThread.start();          while(true){              //当job池里所有的job完成后,执行 下一步操作              if(jc.allFinished()){                  System.out.println(jc.getSuccessfulJobList());                  jc.stop();                  }              //获取执行失败的job列表              if(jc.getFailedJobList().size() > 0){                  System.out.println(jc.getFailedJobList());                  jc.stop();                }          }  三、链式
[java] view plain copy Configuration conf = new Configuration();  Job job = new Job(conf);  job.setJobName("ChianJOb");  // 在ChainMapper里面添加Map1  Configuration map1conf = new Configuration(false);  ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,          Text.class, Text.class, true, map1conf);  // 在ChainReduce中加入Reducer,Map2;  Configuration reduceConf = new Configuration(false);  ChainReducer.setReducer(job, Reduce.class, LongWritable.class,          Text.class, Text.class, Text.class, true, map1conf);  Configuration map2Conf = new Configuration();  ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,          Text.class, Text.class, true, <span style="font-family: Menlo; font-size: 9pt;">map2Conf</span><span style="font-size: 9pt; font-family: Menlo;">);</span>  

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: