MapReduce job在JobTracker初始化源码级分析
2014-05-22 10:24
381 查看
mapreduce job提交流程源码级分析(三)中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业。而这个方法的核心提交方法是JobTracker.addJob(JobID jobId, JobInProgress job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListener(本文只讨论默认调度器)中,使用方法jobAdded(JobInProgress job),JobQueueJobInProgressListener任务是监控各个JobInProcess生命周期中的变化;EagerTaskInitializationListener是发现有新Job后对其初始化的。
一、JobQueueJobInProgressListener.jobAdded(JobInProgress job)方法。就一句代码jobQueue.put(new JobSchedulingInfo(job.getStatus()), job),先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。JobSchedulingInfo类维护这调度这个job必备的一些信息,比如优先级(默认是NORMAL)、JobID以及开始时间startTime。
二、EagerTaskInitializationListener.jobAdded(JobInProgress job)方法。
上面方法中resortInitQueue()方法主要是对jobInitQueue中JobInProcess进行排序,先按照优先级排序,相同的再按开始时间。EagerTaskInitializationListener.start()在调度器初始化时JobQueueTaskScheduler.start()就调用了,所以先于jobAdded方法调用。EagerTaskInitializationListener.start()代码如下:
start()方法会启动一个线程:JobInitManager。
JobInitManager线程的run方法是一个死循环始终监控jobInitQueue是否为空,不为空的话就取出0位置的JobInProgress,在InitJob线程中初始化:TaskTrackerManager.initJob(job)对应JobTracker的initJob方法。这里为什么会另起线程来初始化Job呢?原因很简单,就是可能jobInitQueue中同时会有很多JobInProgress,一个一个的初始化会比较慢,所以采用多线程的方式初始化。来看initJob方法的代码:
首先是获取初始化前的状态prevStatus;然后是job.initTasks()初始化;在获取初始化的后的状态newStatus;
job.initTasks()方法代码比较多,主要的工作是检查之后获取输入数据的分片信息TaskSplitMetaInfo[] splits = createSplits(jobId)这是去读的上传到HDFS中的文件job.splitmetainfo和job.split,要确保numMapTasks == splits.length,然后构建numMapTasks个TaskInProgress作为MapTask,
一、JobQueueJobInProgressListener.jobAdded(JobInProgress job)方法。就一句代码jobQueue.put(new JobSchedulingInfo(job.getStatus()), job),先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。JobSchedulingInfo类维护这调度这个job必备的一些信息,比如优先级(默认是NORMAL)、JobID以及开始时间startTime。
二、EagerTaskInitializationListener.jobAdded(JobInProgress job)方法。
/** * We add the JIP to the jobInitQueue, which is processed * asynchronously to handle split-computation and build up * the right TaskTracker/Block mapping. */ @Override public void jobAdded(JobInProgress job) { synchronized (jobInitQueue) { jobInitQueue.add(job); //添加进List<JobInProgress> jobInitQueue resortInitQueue(); jobInitQueue.notifyAll(); //唤醒阻塞的进程 } }
上面方法中resortInitQueue()方法主要是对jobInitQueue中JobInProcess进行排序,先按照优先级排序,相同的再按开始时间。EagerTaskInitializationListener.start()在调度器初始化时JobQueueTaskScheduler.start()就调用了,所以先于jobAdded方法调用。EagerTaskInitializationListener.start()代码如下:
public void start() throws IOException { this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager"); jobInitManagerThread.setDaemon(true); this.jobInitManagerThread.start(); }
start()方法会启动一个线程:JobInitManager。
///////////////////////////////////////////////////////////////// // Used to init new jobs that have just been created ///////////////////////////////////////////////////////////////// class JobInitManager implements Runnable { public void run() { JobInProgress job = null; while (true) { try { synchronized (jobInitQueue) { while (jobInitQueue.isEmpty()) { jobInitQueue.wait(); } job = jobInitQueue.remove(0); } threadPool.execute(new InitJob(job)); } catch (InterruptedException t) { LOG.info("JobInitManagerThread interrupted."); break; } } LOG.info("Shutting down thread pool"); threadPool.shutdownNow(); } } class InitJob implements Runnable { private JobInProgress job; public InitJob(JobInProgress job) { this.job = job; } public void run() { ttm.initJob(job);//对应JobTracker的对应方法 } }
JobInitManager线程的run方法是一个死循环始终监控jobInitQueue是否为空,不为空的话就取出0位置的JobInProgress,在InitJob线程中初始化:TaskTrackerManager.initJob(job)对应JobTracker的initJob方法。这里为什么会另起线程来初始化Job呢?原因很简单,就是可能jobInitQueue中同时会有很多JobInProgress,一个一个的初始化会比较慢,所以采用多线程的方式初始化。来看initJob方法的代码:
public void initJob(JobInProgress job) { if (null == job) { LOG.info("Init on null job is not valid"); return; } try { JobStatus prevStatus = (JobStatus)job.getStatus().clone(); LOG.info("Initializing " + job.getJobID()); job.initTasks(); //调用该实例的initTasks方 法,对job进行初始化 // Inform the listeners if the job state has changed // Note : that the job will be in PREP state. JobStatus newStatus = (JobStatus)job.getStatus().clone(); if (prevStatus.getRunState() != newStatus.getRunState()) { JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus); synchronized (JobTracker.this) { updateJobInProgressListeners(event); } } } catch (KillInterruptedException kie) { // If job was killed during initialization, job state will be KILLED LOG.error("Job initialization interrupted:\n" + StringUtils.stringifyException(kie)); killJob(job); } catch (Throwable t) { String failureInfo = "Job initialization failed:\n" + StringUtils.stringifyException(t); // If the job initialization is failed, job state will be FAILED LOG.error(failureInfo); job.getStatus().setFailureInfo(failureInfo); failJob(job); } }
首先是获取初始化前的状态prevStatus;然后是job.initTasks()初始化;在获取初始化的后的状态newStatus;
job.initTasks()方法代码比较多,主要的工作是检查之后获取输入数据的分片信息TaskSplitMetaInfo[] splits = createSplits(jobId)这是去读的上传到HDFS中的文件job.splitmetainfo和job.split,要确保numMapTasks == splits.length,然后构建numMapTasks个TaskInProgress作为MapTask,
相关文章推荐
- mapreduce源码分析之JobTracker
- hadoop之MapReduce框架JobTracker端心跳机制分析(源码分析第七篇) 推荐
- mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇
- MapReduce执行过程源码分析(一)——Job任务的提交
- MapReduce Job本地提交过程源码跟踪及分析
- MapReduce(四): JobTracker的初始化
- Hadoop1.2.1源码解析系列:JobTracker(一)——JobTracker初始化
- mapreduce提交job源码分析
- MapReduce源码分析之JobSplitWriter
- Hadoop源码分析--MapReduce作业(job)提交源码跟踪
- JobTracker之辅助线程和对象映射模型分析(源码分析第五篇)
- Hadoop源码分析28 JobTracker 处理JobClient请求
- 图说MapReduce源码--JobTracker.getSetupAndCleanupTasks 任务选择顺序
- 通过jobtracker、tasktraker的log分析mapreduce的过程
- Hadoop源码分析26 JobTracker主要容器和线程
- Hadoop源码分析27 JobTracker空载处理心跳
- MapReduce源码分析之JobSplitWriter
- MapReduce源码解读系列之——作业如何提交到JobTracker
- Hadoop源码分析24 JobTracker启动和心跳处理流程
- Hadoop源码分析23:MapReduce的Job提交过程