您的位置:首页 > 其它

jobtracker对提交作业的初始化

2015-12-11 11:42 337 查看

通过EagerTaskInitializationListener类中的jobAdded(jobInProgress job)方法将所提交的作业加入到要初始化的作业队列中,代码如下:

public void jobAdded(JobInProgress job) {
synchronized (jobInitQueue) {
jobInitQueue.add(job);
resortInitQueue();//按照时间大小递增排序
jobInitQueue.notifyAll();
}
}

在初始化队列jobinitqueue中的是等待初始化的作业。通过线程池中的线程运行的jobinitmanager类代码进行初始化。代码如下:

public void start() throws IOException {//EagerTaskInitializationListener类中的start方法
this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
jobInitManagerThread.setDaemon(true);//设置为守护进程
this.jobInitManagerThread.start();
}

jobinitmanager类的具体代码如下:

class JobInitManager implements Runnable {
public void run() {
JobInProgress job = null;
while (true) {
try {
synchronized (jobInitQueue) {
while (jobInitQueue.isEmpty()) {
jobInitQueue.wait();
}
job = jobInitQueue.remove(0);//每次取出第一个
}
threadPool.execute(new InitJob(job));
} catch (InterruptedException t) {
LOG.info("JobInitManagerThread interrupted.");
break;
}
}
LOG.info("Shutting down thread pool");
threadPool.shutdownNow();
}
}

initjob(job)代码如下:

class InitJob implements Runnable {
private JobInProgress job;
public InitJob(JobInProgress job) {
this.job = job;
}
public void run() {
ttm.initJob(job);//tasktrackermanager接口定义的initjob()方法,在jobtracker中有实现
}
}

jobtracker中的initjob()方法

public void initJob(JobInProgress job) {
if (null == job) {
LOG.info("Init on null job is not valid");
return;
}
try {
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
LOG.info("Initializing " + job.getJobID());
job.initTasks();//调用JobInProgress中的initTasks()方法初始化job中的所有task
// Inform the listeners if the job state has changed
// Note : that the job will be in PREP state.
JobStatus newStatus = (JobStatus)job.getStatus().clone();
if (prevStatus.getRunState() != newStatus.getRunState()) {
JobStatusChangeEvent event =
new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
newStatus);
synchronized (JobTracker.this) {
updateJobInProgressListeners(event);
}
}
} catch (KillInterruptedException kie) {
// If job was killed during initialization, job state will be KILLED
LOG.error("Job initialization interrupted:\n" +
StringUtils.stringifyException(kie));
killJob(job);
} catch (Throwable t) {
String failureInfo =
"Job initialization failed:\n" + StringUtils.stringifyException(t);
// If the job initialization is failed, job state will be FAILED
LOG.error(failureInfo);
job.getStatus().setFailureInfo(failureInfo);
failJob(job);
}
}

JobInProgress中的initTasks()方法中主要代码如下:

public synchronized void initTasks()
numMapTasks = splits.length;

// Sanity check the locations so we don't create/initialize unnecessary tasks
for (TaskSplitMetaInfo split : splits) {
NetUtils.verifyHostnames(split.getLocations());
}

jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);

// Set localityWaitFactor before creating cache
localityWaitFactor =
conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}

// set the launch time
this.launchTime = jobtracker.getClock().getTime();

//
// Create reduce tasks
//
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}

// Calculate the minimum number of maps to be complete before
// we should start scheduling reduces
completedMapsForReduceSlowstart =
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));

// ... use the same for estimating the total output of all maps
resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);

// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];

// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();

// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();

// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];

// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();

// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: