您的位置:首页 > 编程语言

hadoop任务运行源代码深究

2014-03-24 18:08 162 查看
1、TaskTracker.TaskInProgress.launchTask():创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner

2、调用TaskRunner.start():设置child进程的classpath、启动命令、标准输出、错误输出、启动参数、日志大小限制、工作目录等

3、调用TaskRunner.launchJvmAndWait(),实际调用JvmManager.launchJvm()

4、根据任务类型JvmManager.launchJvm()调用会分别调用JvmManager.JvmManagerForType#mapJvmManager.reapJvm()和JvmManager.JvmManagerForType#reduceJvmManager.reapJvm()

5、JvmManagerForType#mapJvmManager.reapJvm()会判断是否要启动一个新jvm还是重用JVM,针对设置了重用策略的尝试重用,否则就启动新JVM,则就会调用JvmManager.JvmManagerForType.spawnNewJvm()

6、JvmManager.JvmManagerForType.spawnNewJvm()会new一个JvmRunner(该类是Thead类)对象,设置JVM运行环境,设置该线程为守护线程,然后启动线程jvmRunner.start()。

7、JvmRunner.run()调用runChild(env),然后根据调用Tasktracker.getTaskController().launchTask()启动任务

8、TaskController主要有两个实现,一个是DefaultTaskController,一个是LinuxTaskController,一般使用LinuxTaskController

9、LinuxTaskController.launchTask()首先会拿到JVM启动命令,然后把jvm启动命令写到指定的缓存目录(根据本地作业运行分配目录、用户、jobId、attemptId和conf生成),文件名是taskjvm.sh,最后设置启动参数,启动命令,日志目录等,通过调用shell脚本启动进程名称为Child的JVM进程。

10、Child.main():启动日志同步守护线程,创建工作目录,创建分布式缓存目录,初始化JVM 监控,然后通过UGI.doAs()调用Task.run()运行任务。

Task分为MapTask和ReduceTask,以下讲解MapTask

org.apache.hadoop.mapred.MapTask.run()

(1)、runNewMapper

org.apache.hadoop.mapreduce.Mapper.run(mapperContext)

Mapper.run()里面就会调用自定义的map类的map函数

(2)、runOldMapper

org.apache.hadoop.mapred.MapRunner.run(in, new OldOutputCollector(collector, conf), reporter)

MapRunner.run()里面就会调用自定义的map类的map函数

注意:在MapTask里面判断是新api还是老api运行任务

参考:
http://blog.csdn.net/riverm/article/details/6826200
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息