Hadoop 学习研究(八): 多Job任务和hadoop中的全局变量
2017-06-10 10:21
507 查看
MapReduce里 实现多个job任务包含(迭代式、依赖式、链式):一、迭代式,所谓的迭代式,下一个执行的Job任务以上一个Job的输出作为输入,最终得到想要的结果。[java]viJob job = new Job(new Configuration(),“test”); JobConf jobConf=(JobConf) job.getConfiguration(); jobConf.setJobName("hadoopJoinTask"); <15d23span class="comment" style="margin:0px;padding:0px;border:none;color:rgb(0,130,0);background-color:inherit;">//设置job输入路径 FileInputFormat.setInputPaths(inputPath); //设置job输出的路径 FileOutputFormat.setOutputPath(jobConf, outPath); Job job2 = new Job(new Configuration(),“test2”); JobConf jobConf2=(JobConf) job2.getConfiguration(); jobConf2.setJobName("hadoopJoinTask"); //设置job2输入路径 job的输出路径 FileInputFormat.setInputPaths(outPath); //设置job2输出的路径 FileOutputFormat.setOutputPath(jobConf2, outPath2);
二、依赖式,工作中经常遇到这样的情况,比如job3需要等job1、job2、、、等执行完才能执行,因此job3是依赖于其他的job完成才能执行。
(推荐使用JobControl来进行多job控制,使用submit提交多job任务到集群执行时,在waitfor阶段容易出现丢失连接的情况,无法从集群获知作业执行情况)[java] view plain copy //hadoop2 查看hadoop源码 JobControl 发现有ControlledJob, ControlledJob里有依赖方法 addDependingJob Job job = new Job(new Configuration(),"job1"); Job job2 = new Job(new Configuration(),"job2"); ControlledJob controlledJob=new ControlledJob(job.getConfiguration()); //设置job controlledJob.setJob(job); ControlledJob controlledJob2=new ControlledJob(job2.getConfiguration()); controlledJob2.setJob(job2); //这里就是设置job依赖的重要方法了,依赖于<span style="font-size: 9pt; font-family: Menlo;">controlledJob</span><span style="font-size: 9pt; font-family: Menlo;"> </span> controlledJob.addDependingJob(controlledJob); JobControl jc=new JobControl("jc"); jc.addJob(controlledJob); jc.addJob(controlledJob2); //由于JobControl实现了Runnable 接口,而Runnable接口只有运行方法,没有结束方法,因此需要一个线程来辅助 Thread jcThread = new Thread(jc); jcThread.start(); while(true){ //当job池里所有的job完成后,执行 下一步操作 if(jc.allFinished()){ System.out.println(jc.getSuccessfulJobList()); jc.stop(); } //获取执行失败的job列表 if(jc.getFailedJobList().size() > 0){ System.out.println(jc.getFailedJobList()); jc.stop(); } } 三、链式[java] view plain copy Configuration conf = new Configuration(); Job job = new Job(conf); job.setJobName("ChianJOb"); // 在ChainMapper里面添加Map1 Configuration map1conf = new Configuration(false); ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); // 在ChainReduce中加入Reducer,Map2; Configuration reduceConf = new Configuration(false); ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, map1conf); Configuration map2Conf = new Configuration(); ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class, Text.class, Text.class, true, <span style="font-family: Menlo; font-size: 9pt;">map2Conf</span><span style="font-size: 9pt; font-family: Menlo;">);</span>
在MapReduce计算框架中实现全局变量:由于不同的task任务共享的只是jar文件的初始版本,且每个task在获取到yarn分配的资源后(container形式)分别运行在独立的JVM中,所以不同的task对Mapper(Reducer)实现类中成员变量以及主类中的成员变量的修改相互没有影响。设置全局变量主要有三种方法:一、配置Job属性 mapreduce的执行过程中task可以读取Job的属性。基于这个特性,程序可以在Job的配置代码中即main函数中利用Congfiguraion类的set函数将一些简单的数据结构封装到作业配置中;task任务启动的过程中(比如setup函数)通过Configuration类的get函数读取即可。这种方法的优点是简单,资源消耗小;缺点则是只能共享一些小型的数据量,对大型的数据结构比较乏力。 下面的代码是在作业配置代码段中进行的全局变量的配置:int nAge = 25;Configuration conf = new Configuration(); conf.set("nAge", nAge);//主要是这一行Job job = new Job(conf, "JobName");下面的代码段是在setup函数中完成的对全局变量的读取:protected void setup(Context context) throws IOException, InterruptedException {try { Configuration conf = context.getConfiguration(); int nAge = conf.get("nAge");} catch (Exception e) { e.printStackTrace(); } }二、使用DistributedCache分布式缓存在job驱动设置的时候添加:
//分布式缓存要存储的文件路径String cachePath[] = {"hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv","hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"};//向分布式缓存中添加文件job.addCacheFile(new Path(cachePath[0]).toUri());job.addCacheFile(new Path(cachePath[1]).toUri());然后可以在Map,Reduce的setup阶段将缓存的文件读取出来,保存在内存当中。
/** 重写Mapper的setup方法,获取分布式缓存中的文件*/@Overrideprotected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubsuper.setup(context);URI[] cacheFile = context.getCacheFiles();Path tagSetPath = new Path(cacheFile[0]);Path tagedUrlPath = new Path(cacheFile[1]);文件操作(如把内容读到set或map中);}@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {在map()中使用读取出的数据;}三、利用指定的HDFS文件进行全局变量的存储 这种方法主要使用了java的API,预先定义数据的存储规则,通过读写HDFS中指定的文件按照预先定义的规则访问数据即可实现全局访问。HDFS中Java的API将在后面的文章中介绍。因为将数据存储在文件中,理论上可以存储足够所有应用可能的全局变量且能读能写还比较直观,缺点是需要使用IO,会占用系统资源,增加作业完成的资源消耗。
相关文章推荐
- hadoop学习笔记(5)-运行任务(Job)小结:第三方jar包、hadoop jar命令
- Hadoop 学习笔记 (十) MapReduce实现排序 全局变量
- Hadoop学习笔记八之 combine 以及常用命令行 和全局变量
- Hadoop学习笔记(老版本,YARN之前),MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- Hadoop学习笔记:MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- Hadoop学习笔记,MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系
- Hadoop学习之路(十五)MapReduce的多Job串联和全局计数器
- hadoop学习笔记(5)-运行任务(Job)小结:第三方jar包、hadoop jar命令
- Python学习笔记之全局变量
- 多线程学习之一:线程对共享全局变量的访问
- Hadoop的“全局变量”
- 通过未初始化全局变量,研究BSS段和COMMON段的不同
- nginx 源码学习笔记(十四)—— 全局变量ngx_cycle
- PHP任务学习2:认清变量的作用范围
- uC/OS-II学习笔记-定义全局变量 .
- Android学习札记49:在Android中使用Application保存全局变量
- Hadoop 全局变量与数据传递
- uC/OS-II学习笔记-定义全局变量
- nginx 源码学习笔记(十四)—— 全局变量ngx_cycle