您的位置:首页 > 运维架构

Hadoop0.21.0源码流程分析(3)-Task节点管理启动任务

2011-07-11 15:55 489 查看
源码流程分析3-Task节点管理启动任务
1. 代码执行流程
1) TaskTracker的启动的时候会加载所有信息,包括利用RPC获得JobTracker 的RPC变量定义为jobClient;TaskTracker.run()方法会去循环向JobTracker心跳,在里面主要调用TaskTracker. offerService()方法;
2) offerService()方法首先调用JobTracker.transmitHeartBeat()方法,去执行jobClient.heartbeat()(也就是JobTracker.heartbeat()),返回心跳信息HeartbeatResponse类,该类包含了主节点分配给该任务节点需要执行的map和reduce任务。每个maptask或reducetask对应一个独自的LaunchTaskAction类实例。
3) 然后,offerService()方法再调用TaskTracker的addToTaskQueue ()方法,循环将LaunchTaskAction中的maptask或reducetask添加到map与reduce维护列表中mapLauncher和reduceLauncher中,这两个对象都是TaskLauncher的实例。
4) addToTaskQueue ()方法在调用的时候会调用到TaskLauncher. addToTaskQueue(),该方法体内会调用TaskTracker.registerTask();该方法会根据每个taskaction(LaunchTaskAction对象)生成一个TaskInProgress的实例,TaskInProgress与DN中的TaskInProgress不同,这里的TaskInProgress是TaskTracker的内部类,它是每个任务运行的主体。
5) TaskLauncher类中的run()方法会循环查找任务去执行。一旦有空闲的slot,就调用TaskTracker.startNewTask()方法开启一个新的任务。startNewTask()方法中首先会去执行关键方法TaskTracker.localizeJob(TaskInProgress)初始化所需要的信息,创建工作目录和调用hdfs下载执行的jar包和配置信息xml。完成向监控队列的添加后,startNewTask()方法会去调用launchTaskForJob(),然后launchTaskForJob()调用TaskTracker$TaskInProgress.launchTask()开始执行Task。
6) launchTask()方法中,首先通过localizeTask方法根据当前的任务创建工作目录,并把所需要的数据信息下载到本地。然后,如果是Map任务的话就是MapTask.createRunner()方法会去创建MapTaskRunner,如果是Reduce的话就是ReduceTask.createRunner()方法去创建ReduceTaskRunner。最后开启TaskRunner线程。
7) TaskRunner是继承Thread的类,在它的run函数主体执行的过程比较复杂,首先是初始化启动java子进程的一系列环境变量,包括设定工作目录workDir,设置CLASSPATH环境变量等(需要将TaskTracker的环境变量以及job jar的路径合并起来),然后装载job jar包。然后,调用launchJvmAndWait()方法,该方法再调用jvmManager.launchJvm()方法。
8) launchJvm()是执行的主体,函数体内会判断是map任务还是reduce任务,并把它加载到管理队列里面。从JvmManager中我们看到runChild()方法的调用,方法中会去调用DefaultTaskController中的launchTaskJVM()方法,DefaultTaskController.launchTaskJVM()会根据已经获取的环境变量和jvm运行堆大小设定等参数,利用ProcessBuilder自动生成shell脚本,并运行来设定task节点的变量信息。
9) 在装载过的信息中有个类是Child,它自动去调用TaskTracker.getTask()方法去取出当前的task任务,然后调用task的run()方法,在这里同样会用到hdfs从中取出输入数据,然后根据判断是旧api还是新api去执行对应的方法。具体在下一节解释。

2. TaskTracker
Task的执行实际是由TaskTracker发起的,TaskTracker会定期(缺省为3秒钟,参见MRConstants类中定义的HEARTBEAT_INTERVAL变量)与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等,TaskTracker里面会通过循环的方式查找。

3. JobTracker.heartbeat()
这个方法主要是TaskTracker端远程调用时用到的方法,其主要作用就是分派具体任务,并将该任务分发到TaskTracker端:
其关键代码为:
1 HeartbeatResponse response = new HeartbeatResponse(newResponseId,null);
2 List<TaskTrackerAction> actions = newArrayList<TaskTrackerAction>();
……
3 if (tasks == null) {
4 tasks =taskScheduler.assignTasks(taskTrackers.get(trackerName));
5 }
6 if (tasks != null) {
7 for (Task task : tasks) {
8 expireLaunchingTasks.addNewTask(task.getTaskID());
……
9 actions.add(newLaunchTaskAction(task));
10 }
11}
这里的调度中主要会根据我们的任务的输入文件dfs中的文件存放节点来分配,数据在哪个节点上任务就分配到哪个节点上的TaskTracker中。
简单来说这里会根据我们的上面提到的JobInProgress和TaskInPorgress调度生成具体的MapTask和ReuceTask实例,他们均继承自抽象类Task该实例会放入到LaunchTaskAction中,最后获得的任务列表会被返回到TaskTracker端。

4. TaskTracker$TaskLauncher
TaskLauncher是个继承了Thread线程的TaskTracker的内部类,在这里面会维护一个TaskInProgress的链表:
private List<TaskInProgress> tasksToLaunch;
该列表中的每个TaskInProgress 实例对应一个TaskUnit任务。
该类中的run方法才是主体关键之处,他会循环判断是否tasksToLaunch中有新任务要做,有就去从该列表中拿出来然后去调用TaskTracker.startNewTask(TaskInprogress);去开启一个新任务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: