您的位置:首页 > 运维架构

Hadoop MapReduce之作业初始化

2013-11-02 15:23 218 查看
  在服务端提交一个job成功后,JobInitManager就会对其进行初始化操作,该线程是EagerTaskInitializationListener的一个内部类线程类,JT启动成功后,这个线程就存在了,他不停的检测jobInitQueue结构,一旦有新的作业提交成功,他就会构建一个作业初始化线程,丢入线程池中执行,这样设计也是为了消除并发较高的场景下作业初始化带来的瓶颈,在构建具体任务时,为了减少网络资源的消耗,map任务会尽量分配在本地节点。

class JobInitManager implements Runnable {

public void run() {
JobInProgress job = null;
while (true) {
try {
//循环遍历jobInitQueue,发现提交作业便取出
synchronized (jobInitQueue) {
while (jobInitQueue.isEmpty()) {
jobInitQueue.wait();
}
job = jobInitQueue.remove(0);
}
//构建一个新的线程,丢入线程池执行作业初始化
threadPool.execute(new InitJob(job));
} catch (InterruptedException t) {
LOG.info("JobInitManagerThread interrupted.");
break;
}
}
LOG.info("Shutting down thread pool");
threadPool.shutdownNow();
}
}

下面我们看作业初始化线程
class InitJob implements Runnable {

private JobInProgress job;

public InitJob(JobInProgress job) {
this.job = job;
}

public void run() {
ttm.initJob(job);//JT实现了TaskTrackerManager接口负责初始化该job
}
}

JT的作业初始化如下,内部会调用提交时JobInProgress来进行任务初始化的操作

public void initJob(JobInProgress job) {
//什么时候作业为空呢??
if (null == job) {
LOG.info("Init on null job is not valid");
return;
}

try {
//记录作业状态
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
LOG.info("Initializing " + job.getJobID());
job.initTasks();//开始初始化作业
// Inform the listeners if the job state has changed
// Note : that the job will be in PREP state.
JobStatus newStatus = (JobStatus)job.getStatus().clone();
//对比作业状态,如果发生改变则更新相关统计信息
if (prevStatus.getRunState() != newStatus.getRunState()) {
JobStatusChangeEvent event =
new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
newStatus);
synchronized (JobTracker.this) {
updateJobInProgressListeners(event);
}
}
} catch (KillInterruptedException kie) {
//作业在初始化阶段就被kill
LOG.error("Job initialization interrupted:\n" +
StringUtils.stringifyException(kie));
killJob(job);
} catch (Throwable t) {
String failureInfo =
"Job initialization failed:\n" + StringUtils.stringifyException(t);
// If the job initialization is failed, job state will be FAILED
LOG.error(failureInfo);
job.getStatus().setFailureInfo(failureInfo);
failJob(job);
}
}

作业初始化的大部分工作在下面JobInProgress的initTask中完成,其中会各种任务:map、reduce、setup、cleanup

public synchronized void initTasks()
throws IOException, KillInterruptedException, UnknownHostException {
//如果初始化完毕或作业已完成,则直接返回
if (tasksInited || isComplete()) {
return;
}
//作业接收到kill信号,则直接返回
synchronized(jobInitKillStatus){
if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;
}
jobInitKillStatus.initStarted = true;
}
//记录初始化日志
LOG.info("Initializing " + jobId);
final long startTimeFinal = this.startTime;
// log job info as the user running the job
try {
userUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,
startTimeFinal, hasRestarted());
return null;
}
});
} catch(InterruptedException ie) {
throw new IOException(ie);
}

// log the job priority
setPriority(this.priority);

//
// generate security keys needed by Tasks
//
generateAndStoreTokens();

//根据split构建map任务
TaskSplitMetaInfo[] splits = createSplits(jobId);
if (numMapTasks != splits.length) {
throw new IOException("Number of maps in JobConf doesn't match number of " +
"recieved splits for job " + jobId + "! " +
"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
}
numMapTasks = splits.length;

// 校验split所在节点信息
for (TaskSplitMetaInfo split : splits) {
NetUtils.verifyHostnames(split.getLocations());
}
//相关统计信息更新
jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
//构建map任务
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getInputDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i, numSlotsPerMap);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);

// 创建map的cache
localityWaitFactor =
conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}

// set the launch time
this.launchTime = jobtracker.getClock().getTime();

//创建reduce任务
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this, numSlotsPerReduce);
nonRunningReduces.add(reduces[i]);
}

// map完成多少才开始进行reduce
completedMapsForReduceSlowstart =
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));

// ... use the same for estimating the total output of all maps
resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);

// 清理任务两个,map、reduce一人一个
cleanup = new TaskInProgress[2];

// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();

// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this, 1);
cleanup[1].setJobCleanupTask();

// setup任务两个,map、reduce一人一个
setup = new TaskInProgress[2];

// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1, 1);
setup[0].setJobSetupTask();

// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this, 1);
setup[1].setJobSetupTask();
//收到kill操作,抛出异常
synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;

// set this before the throw to make sure cleanup works properly
tasksInited = true;

if(jobInitKillStatus.killed) {
throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}
//启动日志记录
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);

// Log the number of map and reduce tasks
LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
+ " map tasks and " + numReduceTasks + " reduce tasks.");
}

通过上面的代码我们知道,tip创建之初会放在cache中,那么这个cache的结构是怎样的呢?我们以map为例看下这个过程
private Map<Node, List<TaskInProgress>> createCache(
TaskSplitMetaInfo[] splits, int maxLevel)
throws UnknownHostException {
//cache实际上是一个map集合
Map<Node, List<TaskInProgress>> cache =
new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
//在操作HDFS文件的情况下,会尽力分配在split物理文件所在节点
//但如果是其他的输入格式,比如hbase的region或关系型数据库则没这个概念
Set<String> uniqueHosts = new TreeSet<String>();
for (int i = 0; i < splits.length; i++) {
String[] splitLocations = splits[i].getLocations();
if (splitLocations == null || splitLocations.length == 0) {
nonLocalMaps.add(maps[i]);
continue;
}
//校验split所在机器节点
for(String host: splitLocations) {
//获得节点信息,包括机架信息:/default-rack/PC-20130917RGUY
Node node = jobtracker.resolveAndAddToTopology(host);
//获得运行该作业map的所有主机
uniqueHosts.add(host);
LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
//把map加入cache中,内容为:node->list<maps>
for (int j = 0; j < maxLevel; j++) {
List<TaskInProgress> hostMaps = cache.get(node);
if (hostMaps == null) {
hostMaps = new ArrayList<TaskInProgress>();
cache.put(node, hostMaps);
hostMaps.add(maps[i]);//由此可看出map任务分配在了split所在的物理节点
}
//check whether the hostMaps already contains an entry for a TIP
//This will be true for nodes that are racks and multiple nodes in
//the rack contain the input for a tip. Note that if it already
//exists in the hostMaps, it must be the last element there since
//we process one TIP at a time sequentially in the split-size order
if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
hostMaps.add(maps[i]);
}
node = node.getParent();
}
}
}

// 计算本地map任务所用节点占整个集群的百分比,比如这里有10个本地map任务,
// 整个集群有100个节点,则这个值为1/10(10/100)
if (localityWaitFactor == DEFAULT_LOCALITY_WAIT_FACTOR) {
int jobNodes = uniqueHosts.size();
int clusterNodes = jobtracker.getNumberOfUniqueHosts();

if (clusterNodes > 0) {
localityWaitFactor =
Math.min((float)jobNodes/clusterNodes, localityWaitFactor);
}
LOG.info(jobId + " LOCALITY_WAIT_FACTOR=" + localityWaitFactor);
}
//返回cache结构
return cache;
}

在job初始化时,关于任务的存放方式:

List<TaskInProgress> nonLocalMaps                   存放非本地,未运行的map任务

Map<Node, List<TaskInProgress>> nonRunningMapCache  存放本地非运行的map

Set<TaskInProgress> nonRunningReduces               非运行的reduce任务,不是计算是否为本地任务,直接添加

TaskInProgress cleanup[] = new TaskInProgress[0];   存放cleanup任务

TaskInProgress setup[] = new TaskInProgress[0];     存放setup任务
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: