Hadoop MapReduce之ReduceTask任务执行(二):GetMapEventsThread线程
2014-06-17 01:44
507 查看
reduce任务为获得map的输出要和TaskTracker 通信以获得map的完成情况,负责这个工作的是GetMapEventsThread。线程运行时会通过TT的代理调用TaskUmbilicalProtocol协议的getMapCompletionEvents来获得MapTaskCompletionEventsUpdate,该返回数据总包含了最新map任务的完成状态:SUCCEEDED
FAILED KILLED OBSOLETE TIPFAILED,线程会根据事件的不同状态更新相关集合。总之该线程总会不停的获得MapTask的完成信息,以便为其他线程(如copier)服务。
主要逻辑包含在getMapCompletionEvents函数中
阅读笔记:
1. ReduceCopier 有一个成员变量 private final Map<String, List<MapOutputLocation>> mapLocations,这个map记录了<Hosts, List of MapIds from this Host>。这个map的key是主机的IP地址,value是在这台主机上完成的所有的map任务的ID。这样就维护了一个全局的map输出结果的位置信息。
FAILED KILLED OBSOLETE TIPFAILED,线程会根据事件的不同状态更新相关集合。总之该线程总会不停的获得MapTask的完成信息,以便为其他线程(如copier)服务。
public void run() { LOG.info(reduceTask.getTaskID() + " Thread started: " + getName()); do { try { //获得新完成的MapTask的数量 int numNewMaps = getMapCompletionEvents(); if (LOG.isDebugEnabled()) { if (numNewMaps > 0) { LOG.debug(reduceTask.getTaskID() + ": " + "Got " + numNewMaps + " new map-outputs"); } } //线程暂停 Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { LOG.warn(reduceTask.getTaskID() + " GetMapEventsThread returning after an " + " interrupted exception"); return; } catch (Throwable t) { String msg = reduceTask.getTaskID() + " GetMapEventsThread Ignoring exception : " + StringUtils.stringifyException(t); reportFatalError(getTaskID(), t, msg); } } while (!exitGetMapEvents); LOG.info("GetMapEventsThread exiting"); }
主要逻辑包含在getMapCompletionEvents函数中
private int getMapCompletionEvents() throws IOException { int numNewMaps = 0; //与TT通信获得事件数据 MapTaskCompletionEventsUpdate update = umbilical.getMapCompletionEvents(reduceTask.getJobID(), fromEventId.get(), MAX_EVENTS_TO_FETCH, reduceTask.getTaskID(), jvmContext); //提取事件数组 TaskCompletionEvent events[] = update.getMapTaskCompletionEvents(); // Check if the reset is required. // Since there is no ordering of the task completion events at the // reducer, the only option to sync with the new jobtracker is to reset // the events index if (update.shouldReset()) { fromEventId.set(0); obsoleteMapIds.clear(); // clear the obsolete map mapLocations.clear(); // clear the map locations mapping } // Update the last seen event ID fromEventId.set(fromEventId.get() + events.length); // Process the TaskCompletionEvents: // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs. // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop // fetching from those maps. // 3. Remove TIPFAILED maps from neededOutputs since we don't need their // outputs at all. for (TaskCompletionEvent event : events) { switch (event.getTaskStatus()) { case SUCCEEDED: { //如果该MapTask执行成功,则构建新的mapOutputLocation,并更新mapLocations //该集合会在主线程的fetchOutputs函数中取出有效map位置,并将其放入scheduledCopies //集合中供copier线程使用 URI u = URI.create(event.getTaskTrackerHttp()); String host = u.getHost(); TaskAttemptID taskId = event.getTaskAttemptId(); URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + "/mapOutput?job=" + taskId.getJobID() + "&map=" + taskId + "&reduce=" + getPartition()); List<mapoutputlocation> loc = mapLocations.get(host); if (loc == null) { loc = Collections.synchronizedList (new LinkedList<mapoutputlocation>()); mapLocations.put(host, loc); } loc.add(new MapOutputLocation(taskId, host, mapOutputLocation)); numNewMaps ++; } break; case FAILED: case KILLED: case OBSOLETE: { //不能获取则加入过期集合 obsoleteMapIds.add(event.getTaskAttemptId()); LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + " map-task: '" + event.getTaskAttemptId() + "'"); } break; case TIPFAILED: { //任务失败则忽略该输出 copiedMapOutputs.add(event.getTaskAttemptId().getTaskID()); LOG.info("Ignoring output of failed map TIP: '" + event.getTaskAttemptId() + "'"); } break; } } return numNewMaps; } } }
阅读笔记:
1. ReduceCopier 有一个成员变量 private final Map<String, List<MapOutputLocation>> mapLocations,这个map记录了<Hosts, List of MapIds from this Host>。这个map的key是主机的IP地址,value是在这台主机上完成的所有的map任务的ID。这样就维护了一个全局的map输出结果的位置信息。
相关文章推荐
- Hadoop MapReduce之ReduceTask任务执行(一):远程拷贝map输出
- Hadoop MapReduce之ReduceTask任务执行(三):Merger线程分析
- hadoop执行mapreduce任务,能够map,不能reduce,Shuffle阶段报错
- Hadoop MapReduce之ReduceTask任务执行(五)
- Hadoop MapReduce之ReduceTask任务执行(二)
- Hadoop MapReduce之ReduceTask任务执行(六)
- Hadoop MapReduce之ReduceTask任务执行(三)
- Hadoop MapReduce之ReduceTask任务执行(一)
- Hadoop MapReduce之ReduceTask任务执行(四)
- 记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决
- Hadoop MapReduce执行过程中map和reduce执行过程
- hadoop-mapreduce中reducetask执行分析
- Hadoop MapReduce执行过程详解及MR中job参数及设置map和reduce的个数(带hadoop例子)
- Hadoop的MapReduce框架中map和reduce的各自任务(能力工场--整理)
- Hadoop - Map/Reduce 通过理解org.apache.hadoop.mapreduce.Job类来学习hadoop的执行逻辑
- ThreadPoolTaskExecutor 多线程分解执行耗时任务与单线程串行执行简单比较
- C# 使用 Task 替换 ThreadPool ,异步监测所有线程(任务)是否全部执行完毕
- Hadoop 少量map/reduce任务执行慢问题
- hadoop 中map、reduce数量对mapreduce执行速度的影响
- 使用 Task 替换 ThreadPool ,异步监测所有线程(任务)是否全部执行完毕