您的位置:首页 > 运维架构

Hadoop源码分析--MapReduce作业(job)提交源码跟踪

2016-05-15 14:01 786 查看
首先,在自己写的MR程序中通过org.apache.hadoop.mapreduce.Job来创建Job。配置好之后通过waitForCompletion()方法来提交Job。Hadoop版本是2.4.1。



进入waitForCompletion()方法,在判断状态state可以提交Job后,执行submit()方法。monitorAndPrintJob()

方法会不断的刷新获取job运行的进度信息,并打印。boolean参数verbose为true表明要打印运行进度,为false就

只是等待job运行结束,不打印运行日志。



进入submit()方法,Submit方法首先是确保当前的Job的状态是处于DEFINE,否则不能提交Job。然后启用新的API,即org.apache.hadoop.mapreduce下的Mapper和Reducer。Connect()方法会产生一个Client实例,用来和ResourceManager通信。其实submit()方法里关键的两处代码,一处是调用connect()方法,另一处是获取一个JobSubmitter类的实例,调用该对象的submitJobInternal方法来提交任务。下面分别看这款两个方法。



首先看connect()方法:

MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,它实际上是构造集群Cluster实例cluster,代码如下:



进入return的Cluster(getConfiguration())构造方法,来到了Cluster类。先来看下Cluster类的成员信息:



Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider和客户
端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。

Cluster类提供了两个构造函数,如下:



最终会调用initialize()方法完成初始化,代码如下:



上面再说create()方法时已经提到了两种ClientProtocolProvider实现类,后来通过查阅资料得知了更加确切的说法:MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。



我们先看下看下Local模式,LocalClientProtocolProvider的create()方法,代码如下:



由上可知,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。

再来看Yarn模式,看下YarnClientProtocolProvider的create()方法,代码如下:



到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner。

以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:



其中,最重要的一个变量就是ResourceManager的代理ResourceMgrDelegate类型的resMgrDelegate实例,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。以后有时间还要详细介绍这个对象。另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache。

接下来,我们看下YARNRunner的构造函数,共有三个:



YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。

connect()方法总结
MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.4.1中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。

submitJobInternal()方法

再次回到submit()方法,上面已经介绍了connect()方法,下面开始介绍另一个重要的的方法submitJobInternal()。
该方法隶属于JobSubmitter类,顾名思义,该类是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑。
首先,我们先看下JobSubmitter的类成员变量,如下:



它一共有四个类成员变量,分别为:
1、文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等;
2、客户端通信协议ClientProtocol实例submitClient:用于与集群交互,完成作业提交、作业状态查询等,上文
已经介绍过了。
3、提交作业的主机名submitHostName;
4、提交作业的主机地址submitHostAddress。
接下来,我们再看下JobSubmitter的构造函数,如下:



接下里是最重要的代码——JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群,代码如下:









至此,MapReduce的Job提交的大体过程就分析完毕!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: