一个MapReuce作业的从开始到结束--第7章 MapReduce的执行过程
2013-11-29 15:42
281 查看
1. Job类
在新版的API中,MapReduce的执行有Job类管理。Job类的代码在mapred/org/apache/hadoop/mapreduce/Job.java在wordCount的例子中,main函数执行一下的步骤:
Job job = new Job(conf,"word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1); |
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
其实也是设置conf,看到源代码就知道了。
Job的waitForCompletion函数负责执行MapReduce作业,它的执行流程如下:
[1].调用Job类的submit函数,向JobTracker提交作业,提交后,会返回一个RunningJob类的实例info。
[2].调用JobClient类的实例jobClient的monitorAndPrintJob函数,根据info监控和打印MapReduce作业的执行中的各种信息。
[3].返回MapReduce作业执行是否成功。
Job类的submit函数提交作业的流程如下:
[1].调用Job类的connect函数,连接到JobTracker节点。
[2].调用JobClient类的实例jobClient的submitJobInternal函数,提交MapReduce作业。
2.Job类的connect函数的执行
它只执行一条语句,如下:ugi.doAs(newPrivilegedExceptionAction<Object>() { public Object run()throws IOException { jobClient = newJobClient((JobConf) getConfiguration()); return null; } }); |
ugi =UserGroupInformation.getCurrentUser(),这里涉及到Hadoop的安全机制,暂且不用管。
当ugi执行doAs函数的时候,执行Subject.doAs(subject,action),也就是运行run函数,运行run函数时,jobClient会进行初始化,然后连接JobTracker。JobClient初始化的时候,会执行init函数,创建RPC远程过程调用rpcJobSubmitClient,然后再创建它的代理类jobSubmitClient。
3.JobClient类的submitJobInternal函数的执行
JobClient类submitJobInternal函数,向JobTracker节点提交MapReduce作业。在这个函数里,提交作业的关键语句是:status =jobSubmitClient.submitJob(jobId, submitJobDir.toString(),jobCopy.getCredentials())
jobId是MapReduce的ID号,它是从JobTracker节点获取的,语句如下:
JobID jobId =jobSubmitClient.getNewJobId()
submitJobDir是JobTracker所在的文件系统的一个目录,如果Hadoop运行在集群状态下,submitJobDir就是HDFS上的一个目录,这个目录对应提交的作业。
getCredentials(),Credentials类提供读写高安全性的键值函数。
jobSubmitClient是在init函数里创建的远程过程调用RPC的代理类,调用它的submitJob函数,就是远程调用JobTracker节点的submitJob函数。
提交作业的本质就是,从JobTracker上获取jobId,然后在文件系统上创建一个特定的目录,把所有跟任务相关的数据都写到这个目录,然后再通知JobTracker去执行作业。
如果Hadoop运行在伪分布模式,那么JobTracker是localhost,也就本地主机,这时候,Hadoop调用LocalRunner执行MapReduce作业。
如果如果Hadoop运行在集群模式,那么JobTracker是远程主机,Hadoop调用JobTracker节点以集群的方式执行MapReduce作业。
4.JobTracker节点执行MapReduce作业
JobTracker的submit函数被调用后,会先对作业进行多个步骤的处理,如检查参数,创建job等,然后,将job加入到执行队列中,关键语句是:status = addJob(jobId, job)
将让listener也添加作业:
for (JobInProgressListenerlistener : jobInProgressListeners) {
listener.jobAdded(job);
}
jobs记录了提交的作业,它的定义如下:
Map<JobID, JobInProgress>jobs = Collections.synchronizedMap(new TreeMap<JobID,JobInProgress>())
JobID是提交的作业的标示符,用它区分作业。JobInProgress是作业的相关信息。
对作业各种操作,都是通过jobs来的,比如要杀掉一个作业,是根据JobID在jobs里找到它对应的JobInProgress,然后kill掉。如果要获取一个作业的任务执行情况,是根据JobID在jobs里找到它对应的JobInProgress,然后根据JobInProgress得到TaskInProgress。
4.1JobTracker节点的运行
JobTracker节点从main函数开始运行,main函数先调用startTracker函数,然后再调用offerService函数。在offserService函数,创建RPC服务端interTrackerServer,然后启动taskScheduler,启动expireTrackersThread,启动retireJobsThread,调用interTrackerServer的join()函数,进入服务状态。
JobTracker类的内部类RetireJobs,清理已经完成的旧的作业,它实现了Runnable接口,在一个线程里执行,它在执行的时候,从jobs里取出所有的作业,然后进行判断和清理。Java里实现多线程有两种方式,一种是继承Thread类,重写run函数,另一种是实现Runnable接口,实现run函数,然后把这个类在一个线程里执行。有一片文档写的很好,把Thread和Runnable解释的很清楚,http://developer.51cto.com/art/201203/321042.htm。
JobTracker类的内部类RecoverManager,重启之后,恢复已经执行和正在执行的作业。GetSetupAndCleanupTasks获取清理掉的任务和已设置的任务。
JobTrackeer的initJob函数,初始化作业。job执行JobInProcess的initTasks函数,在initTasks函数,作业会被拆成若干个Map任务,若干个Reduce任务。JobTracker的heartbeat函数,这个函数由TaskTracker调用,而且是以RPC的方式调用。有很多很多的的操作,在心跳里发给TaskTracker。
4.2JobTracker节点处理作业
当TaskNode节点启动之后,会循环调用JobTracker节点的心跳函数,也就是heatbeat函数。当heartbeat函数被调用后,会返回一个可序列化的HeartbeatResponse对象,步骤如下:
[1].创建一个新的HeartbeatResponse对象response。
[2].通常情况下,获取要发给TaskTracker执行的任务:getSetupAndCleanupTasks,在这个函数里,是从jobs中取得job,然后按照一定的次序从job中获取要执行的Task。
[3].把任务存放到LanchTaskAction中,然后存储到actions里。然后再给actions添加其他各种任务,如清理需要Kill的任务,需要存储结果的任务,计算下一次心跳间隔等等。
[4].返回response给TaskTracker节点。
5.TaskTracker节点执行MapReduce作业
TaskTracker类的代码在文件mapred/org/apache/hadoop/mapred/TaskTracker.java里。5.1TaskTracker节点的运行
TaskTracker节点启动时,步骤如下:[1]. 运行main函数。
[2].在main函数,创建TaskTracker对象,然后执行它的run函数,进入无限循环。
[3]. 在run函数,执行offerService函数。
[4].在offerService函数,执行一个无限循环,在循环里,定期向JobTracker发送心跳,也就是执行函数:
HeartbeatResponseheartbeatResponse = transmitHeartBeat(now);
然后,从 heartbeatResponse取出要执行的任务:
TaskTrackerAction[]actions = heartbeatResponse.getActions();
通常情况下,会将actions加入到任务队列:
addToTaskQueue((LaunchTaskAction)action);
实际上,如果是Map任务,就加入到mapLauncher,如果是Reduce任务,加入到reduceLauncher。
5.2TaskTracker处理作业
mapLauncher和reduceLauncher是线程类TaskLauncher的实例,当TaskTracker在运行的时候,它们负责取出Map任务和Reduce任务执行。从taskLaucher线程类的run函数可知,处理的流程是这样的:
tip =tasksToLaunch.remove(0);
startNewTask(tip);
startNewTask函数,执行任务:
RunningJob rjob =localizeJob(tip); launchTaskForJob(tip,new JobConf(rjob.getJobConf()), rjob) |
localizeTask(task); setTaskRunner(task.createRunner(TaskTracker.this,this, rjob)); this.runner.start(); |
TaskRunner是线程类,从它的run函数可以知道任务执行是,TaskRunner创建环境变量,参数,启动命令,输出流等等,然后由JvmManager对象启动一个JVM执行任务:
launchJvmAndWait(setupCmds,vargs, stdout, stderr, logSize, workDir)
也就是在JvmManager里,执行launchJvm函数,然后,执行:
setRunningTaskForJvm(jvmRunner.jvmId,t)
然后,在JvmManager里执行内部类JvmRunner的runChild函数。
然后,在JvmManager里调用DefaultTaskController的launchTask函数执行,主要是两行代码:
shExec = newShellCommandExecutor(new String[]{ "bash",commandFile}, currentWorkDirectory); shExec.execute(); |
在脚本,执行的是org.apache.hadoop.mapred.Child类。
然后,假如是Map任务的话,会执行到MapTask的run函数,再执行runNewMapper函数,终于终于,在这里开始执行Map任务了,这个Map任务就是我们在jar包里的Mapper类,于是,经过各种实例化之后,读数据等等,执行语句就是:
mapperContext =contextConstructor.newInstance( mapper,job, getTaskID(), input,output, committer, reporter,split); input.initialize(split,mapperContext); mapper.run(mapperContext); input.close(); output.close(mapperContext); |
相关文章推荐
- 一个MapReuce作业的从开始到结束--第6章Hadoop以Jar包的方式执行MapReduce任务
- 一个MapReuce作业的从开始到结束--第2章 启动Hadoop
- 一个MapReuce作业的从开始到结束--第3章 NameNode节点启动分析
- 一个MapReuce作业的从开始到结束--第4章 DataNode节点启动分析
- 一个MapReuce作业的从开始到结束--第5章 把文件复制到HDFS的流程
- 一个MapReuce作业的从开始到结束--第1章 NameNode节点的格式化
- SQL作业--让数据库定期执行一个存储过程
- 《linux内核分析》作业一:反汇编一个C语言程序并分析汇编代码执行过程
- 证明一个操作的一致性读过程是以自己开始执行的时间为准回滚的不是以该操作所在的事务开始的时间为准回滚的
- MapReduce作业执行过程
- 【Hadoop】MapReduce笔记(一):MapReduce作业运行过程、任务执行
- WPF Blend 一个动画结束后另一个动画开始执行(一个一个执行)
- 通过一个可执行文件被执行的过程理解进程的深刻性
- 从MaxTemperature程序来看Mapreduce 的执行过程
- 自定义MapReduce的InputFormat,提取指定开始与结束限定符间的内容
- delphi执行一个外部程序,当外部程序结束后,delphi程序立即响应
- MapReduce作业执行流程
- 一个结束,又是一个开始
- C#.NET执行Oracle DBLink关于“ORA-02041: 客户数据库未开始一个事务处理”
- hadoop2提交到Yarn: Mapreduce执行过程分析