您的位置:首页 > 运维架构

Hadoop MapReduce之ReduceTask任务执行(二):GetMapEventsThread线程

2014-06-17 01:44 507 查看
reduce任务为获得map的输出要和TaskTracker 通信以获得map的完成情况,负责这个工作的是GetMapEventsThread。线程运行时会通过TT的代理调用TaskUmbilicalProtocol协议的getMapCompletionEvents来获得MapTaskCompletionEventsUpdate,该返回数据总包含了最新map任务的完成状态:SUCCEEDED
FAILED KILLED OBSOLETE TIPFAILED,线程会根据事件的不同状态更新相关集合。总之该线程总会不停的获得MapTask的完成信息,以便为其他线程(如copier)服务。

public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());

do {
try {
//获得新完成的MapTask的数量
int numNewMaps = getMapCompletionEvents();
if (LOG.isDebugEnabled()) {
if (numNewMaps > 0) {
LOG.debug(reduceTask.getTaskID() + ": " +
"Got " + numNewMaps + " new map-outputs");
}
}
//线程暂停
Thread.sleep(SLEEP_TIME);
}
catch (InterruptedException e) {
LOG.warn(reduceTask.getTaskID() +
" GetMapEventsThread returning after an " +
" interrupted exception");
return;
}
catch (Throwable t) {
String msg = reduceTask.getTaskID()
+ " GetMapEventsThread Ignoring exception : "
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, msg);
}
} while (!exitGetMapEvents);

LOG.info("GetMapEventsThread exiting");

}


主要逻辑包含在getMapCompletionEvents函数中

private int getMapCompletionEvents() throws IOException {

int numNewMaps = 0;
//与TT通信获得事件数据
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
fromEventId.get(),
MAX_EVENTS_TO_FETCH,
reduceTask.getTaskID(), jvmContext);

//提取事件数组
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();

// Check if the reset is required.
// Since there is no ordering of the task completion events at the
// reducer, the only option to sync with the new jobtracker is to reset
// the events index
if (update.shouldReset()) {
fromEventId.set(0);
obsoleteMapIds.clear(); // clear the obsolete map
mapLocations.clear(); // clear the map locations mapping
}

// Update the last seen event ID
fromEventId.set(fromEventId.get() + events.length);

// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
//    fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
//    outputs at all.
for (TaskCompletionEvent event : events) {
switch (event.getTaskStatus()) {
case SUCCEEDED:
{
//如果该MapTask执行成功,则构建新的mapOutputLocation,并更新mapLocations
//该集合会在主线程的fetchOutputs函数中取出有效map位置,并将其放入scheduledCopies
//集合中供copier线程使用
URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
TaskAttemptID taskId = event.getTaskAttemptId();
URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
"/mapOutput?job=" + taskId.getJobID() +
"&map=" + taskId +
"&reduce=" + getPartition());
List<mapoutputlocation> loc = mapLocations.get(host);
if (loc == null) {
loc = Collections.synchronizedList
(new LinkedList<mapoutputlocation>());
mapLocations.put(host, loc);
}
loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
numNewMaps ++;
}
break;
case FAILED:
case KILLED:
case OBSOLETE:
{
//不能获取则加入过期集合
obsoleteMapIds.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");
}
break;
case TIPFAILED:
{
//任务失败则忽略该输出
copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");
}
break;
}
}
return numNewMaps;
}
}
}


阅读笔记:

1. ReduceCopier 有一个成员变量 private final Map<String, List<MapOutputLocation>> mapLocations,这个map记录了<Hosts, List of MapIds from this Host>。这个map的key是主机的IP地址,value是在这台主机上完成的所有的map任务的ID。这样就维护了一个全局的map输出结果的位置信息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: