您的位置:首页 > 大数据 > Hadoop

Hadoop-2.7.3源码分析:MapReduce作业提交源码跟踪

2019-04-14 15:36 169 查看

1、提交Job

MapReduce程序框架请参考
https://www.geek-share.com/detail/2707064106.html

@Override
public int run(String[] args) throws Exception {
//读取配置文件
Configuration conf = getConf();
//设置参数
conf.set("fs.defaultFS", "hdfs://192.168.11.81:9000");
//自定义key value 之间的分隔符(默认为tab)
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
// Job表示一个MapReduce任务,构造器第二个参数为Job的名称。
Job job = Job.getInstance(conf, "MapReduceTemplate");
job.setJarByClass(MapReduceTemplate.class);//主类
Path in = new Path(args[0]);//输入路径
Path out = new Path(args[1]);//输出路径
FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out)) {//如果输出路径存在就删除
hdfs.delete(out, true);
}
FileInputFormat.setInputPaths(job, in);//文件输入
FileOutputFormat.setOutputPath(job, out);//文件输出
job.setMapperClass(MapTemplate.class); //设置自定义Mapper
job.setReducerClass(ReduceTemplate.class); //设置自定义Reducer
job.setInputFormatClass(KeyValueTextInputFormat.class);//文件输入格式
job.setOutputFormatClass(TextOutputFormat.class);//文件输出格式
job.setOutputKeyClass(Text.class);//设置作业输出值 Key 的类
job.setOutputValueClass(Text.class);//设置作业输出值 Value 的类
return job.waitForCompletion(true)?0:1;//等待作业完成退出
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
[/code]

2、进入Job.waitForCompletion()方法

在判断状态state可以提交Job后,执行submit()方法。monitorAndPrintJob()方法会不断的刷新获取job运行的进度信息,并打印。boolean参数verbose为true表明要打印运行进度,为false就只是等待job运行结束,不打印运行日志。

/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
*         <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//当state为define时,则提交
if (state == JobState.DEFINE) {
submit(); //提交过程在submit()方法中
}
//verbose是waitForCompletion(verbose)传进来的true,表示执行monitorAndPrintJob()
//检测并打印Job相关信息
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
//从客户端获得完成轮询时间间隔
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
//返回一个boolean值,表示作业是否成功提交
return isSuccessful();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
[/code]

3、进入submit()方法

submit方法首先是确保当前的Job的状态是处于DEFINE,否则不能提交Job。然后启用新的API,即org.apache.hadoop.mapreduce下的Mapper和Reducer。Connect()方法会产生一个Client实例,用来和ResourceManager通信。其实submit()方法里关键的两处代码,一处是调用connect()方法,另一处是获取一个JobSubmitter类的实例,调用该对象的submitJobInternal方法来提交任务。

/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//再次检查作业的状态
ensureState(JobState.DEFINE);
//两套API,这里使用新API
setUseNewAPI();
connect();
//初始化工作,为cluster赋值,Client即是提交器,分为本体提交器和Yarn提交器,由配置文件决定
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);//提交
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
[/code]

4、首先看connect()方法

MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,它实际上是构造集群Cluster实例cluster

private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {//若cluster空,则构造Cluster实例
//cluster是连接MapReduce集群的工具,提供了远程获取MapReduce集群的方法
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
[/code]

5、进入return的Cluster(getConfiguration())构造方法

来到了Cluster类。先来看下Cluster类的成员信息。

package org.apache.hadoop.mapreduce;

import ...
/**
* Provides a way to access information about the map/reduce cluster.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {

@InterfaceStability.Evolving
public static enum JobTrackerStatus {INITIALIZING, RUNNING};
private ClientProtocolProvider clientProtocolProvider;  //客户端通信协议提供者
private ClientProtocol client;        //客户端通信协议实例
private UserGroupInformation ugi;     //用户组信息
private Configuration conf;           //配置信息
private FileSystem fs = null;         //文件系统实例
private Path sysDir = null;           //系统路径
private Path stagingAreaDir = null;   //作业资源存放路径
private Path jobHistoryDir = null;    //作业历史路径
private static final Log LOG = LogFactory.getLog(Cluster.class);  //日志
//客户端通信协议提供者加载器
private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
ServiceLoader.load(ClientProtocolProvider.class);

static {
ConfigUtil.loadResources();
}

public Cluster(Configuration conf) throws IOException {
this(null, conf);
}

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;  //设置配置信息
this.ugi = UserGroupInformation.getCurrentUser(); //获取当前用户
initialize(jobTrackAddr, conf);  //完成初始化
}

...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
[/code]

Cluster最重要的两个成员变量是客户端通信协议提供者ClientProtocolProvider实例clientProtocolProvider和客户端通信协议ClientProtocol实例client,而后者是依托前者的create()方法生成的。 Cluster类提供了两个构造函数。

6、进入initialize()方法

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {

synchronized (frameworkLoader) {
//依次取出每个ClientProtocolProvider,通过其create()方法构造ClientProtocol实例
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
//如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行
//如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
//设置成员变量clientProtocolProvider和client,并退出循环
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e);
}
}
}
//如果clientProtocolProvider或client空,抛出IOException
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
[/code]

7、ClientProtocolProvider实现类LocalClientProtocolProvider

上面create()方法时提到了两种ClientProtocolProvider实现类。
MapReduce中,ClientProtocolProvider抽象类的实现共有YarnClientProtocolProvider、LocalClientProtocolProvider两种,前者为Yarn模式,而后者为Local模式。

我们先看下看下Local模式,LocalClientProtocolProvider的create()方法,代码如下

package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;

@InterfaceAudience.Private
public class LocalClientProtocolProvider extends ClientProtocolProvider {

@Override
public ClientProtocol create(Configuration conf) throws IOException {
//两个常量:"mapreduce.framework.name","local"
String framework =
conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
//若framework是local,则返回LocalJobRunner,并且设置Map任务数1;否则返回null
if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
return null;
}
conf.setInt(JobContext.NUM_MAPS, 1);
return new LocalJobRunner(conf);
}

@Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) {
return null; // LocalJobRunner doesn't use a socket
}

@Override
public void close(ClientProtocol clientProtocol) {
// no clean up required
}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
[/code]

由上可知,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。

8、ClientProtocolProvider实现类YarnClientProtocolProvider

再来看Yarn模式,看下YarnClientProtocolProvider的create()方法

package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
public class YarnClientProtocolProvider extends ClientProtocolProvider {

@Override
public ClientProtocol create(Configuration conf) throws IOException {
//若参数mapreduce.framework.name配置为Yarn,则构造一个YARNRunner实例并返回,否则返回null
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}

@Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf)
throws IOException {
return create(conf);
}

@Override
public void close(ClientProtocol clientProtocol) throws IOException {
if (clientProtocol instanceof YARNRunner) {
((YARNRunner)clientProtocol).close();
}
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
[/code]

到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner。

9、YARNRunner

以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现。
最重要的一个变量就是ResourceManager的代理ResourceMgrDelegate类型的resMgrDelegate实例,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。以后有时间还要详细介绍这个对象。另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache。

package org.apache.hadoop.mapred;

import ...

/**
* This class enables the current JobClient (0.22 hadoop) to run on YARN.
*/
@SuppressWarnings("unchecked")
public class YARNRunner implements ClientProtocol {

private static final Log LOG = LogFactory.getLog(YARNRunner.class);
//记录工厂RecordFactory实例
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private ResourceMgrDelegate resMgrDelegate;   //ResourceManager代理实例对象
private ClientCache clientCache;              //客户端缓存实例
private Configuration conf;                   //配置信息
private final FileContext defaultFileContext; //文件上下文实例

/**
* Yarn runner incapsulates the client interface of
* yarn
* @param conf the configuration object for the client
*/
//先构造ResourceManager代理ResourceMgrDelegate实例,再调用两个参数的YARNRunner构造器
public YARNRunner(Configuration conf) {
this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
}

/**
* Similar to {@link #YARNRunner(Configuration)} but allowing injecting
* {@link ResourceMgrDelegate}. Enables mocking and testing.
* @param conf the configuration object for the client
* @param resMgrDelegate the resourcemanager client handle.
*/
//先构造客户端缓存ClientCache实例,再调用三个参数的YARNRunner构造器
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
}

/**
* Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
* but allowing injecting {@link ClientCache}. Enable mocking and testing.
* @param conf the configuration object
* @param resMgrDelegate the resource manager delegate
* @param clientCache the client cache object.
*/
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
ClientCache clientCache) {
this.conf = conf;  //成员变量赋值
try {
this.resMgrDelegate = resMgrDelegate;
this.clientCache = clientCache;
//获取文件上下文defaultFileContext实例
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}

...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
[/code]

YARNRunner一共提供了三个构造函数,而我们之前说的WordCount作业提交时,其内部调用的是YARNRunner带有一个参数的构造函数,它会先构造ResourceManager代理ResourceMgrDelegate实例,然后再调用两个参数的构造函数,继而构造客户端缓存ClientCache实例,然后再调用三个参数的构造函数,而最终的构造函数只是进行简单的类成员变量赋值,然后通过FileContext的静态getFileContext()方法获取文件山下文FileContext实例defaultFileContext。

10、connect()方法总结

MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,而Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。

11、submitJobInternal()方法

再次回到(3、进入submit()方法)submit()方法,上面已经介绍了connect()方法,下面开始介绍另一个重要的的方法submitJobInternal()。

该方法隶属于JobSubmitter类,顾名思义,该类是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑。

JobSubmitter的类一共有四个类成员变量,分别为:
1. 文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等;
2. 客户端通信协议ClientProtocol实例submitClient:用于与集群交互,完成作业提交、作业状态查询等,上文已经介绍过了。
3. 提交作业的主机名submitHostName;
4. 提交作业的主机地址submitHostAddress。

@InterfaceAudience.Private
@InterfaceStability.Unstable
class JobSubmitter {
protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
private static final int SHUFFLE_KEY_LENGTH = 64;
private FileSystem jtFs;    //文件系统FileSystem实例
private ClientProtocol submitClient;  //客户端通信协议ClientProtocol实例
private String submitHostName;        //提交作业的主机名
private String submitHostAddress;     //提交作业的主机

//构造器
JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)
throws IOException {
this.submitClient = submitClient;
this.jtFs = submitFs;
}

...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
[/code]

接下里是最重要的代码——JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群

/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
*   <li>
*   Checking the input and output specifications of the job.
*   </li>
*   <li>
*   Computing the {@link InputSplit}s for the job.
*   </li>
*   <li>
*   Setup the requisite accounting information for the
*   {@link DistributedCache} of the job, if necessary.
*   </li>
*   <li>
*   Copying the job's jar and configuration to the map-reduce system
*   directory on the distributed file-system.
*   </li>
*   <li>
*   Submitting the job to the <code>JobTracker</code> and optionally
*   monitoring it's status.
*   </li>
* </ol></p>
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {

//validate the jobs output specs  验证作业输出规格
//检查作业输出路径是否配置并且是否存在。正确情况是已经配置且不存在
//输出路径的配置参数为mapreduce.output.fileoutputformat.outputdir
checkSpecs(job);
//获取配置信息
Configuration conf = job.getConfiguration();
//添加应用框架路径到分布式缓存中
addMRFrameworkToDistributedCache(conf);
//通过静态方法getStagingDir()获取作业执行时相关资源的存放路径
//参数未配置时默认是/tmp/hadoop-yarn/staging/提交作业用户名/.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
//在提交dfs上正确配置命令行选项
InetAddress ip = InetAddress.getLocalHost();//获取当前主机IP
if (ip != null) {//记录提交作业的主机IP、主机名,并且设置配置信息conf
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();//生成作业ID,即是jobID
job.setJobID(jobId);//将jobID设置入job
//构造提交作业路径,jobStagingArea后接/jobID
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {//设置一些作业参数
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir  获得路径的授权令牌
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
//获取秘钥和令牌,并将它们存储到令牌缓存TokenCache中
populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers 生成一个秘密来验证洗牌转移
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
//复制并配置相关文件
copyAndConfigureFiles(job, submitJobDir);
//获取配置文件路径
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job 创建作业的拆分
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//调用writeSplits()方法,写分片数据文件job.splits和分片元数据文件job.splitmetainfo,计算map任务数
int maps = writeSplits(job, submitJobDir);
//设置map数
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

// write "queue admins of the queue to which job is being submitted"
// to job file.获取作业队列名queue,取参数mapreduce.job.queuename,默认值为default
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);//清除缓存的令牌
//根据参数确定是否需要追踪令牌ID
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids 通过job获取令牌id,并且缓存到trackingIds列表中
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}

// Set reservation info if it exists 设置保留信息,如果它存在
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}

// Write job file to submit dir 写作业文件提交目录
writeConf(conf, submitJobFile);

//
// Now, actually submit the job (using the submit name)
// 现在,实际提交作业(使用提交名称)
printTokens(jobId, job.getCredentials());
//通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业
//并获取作业状态实例status。由上下文可知,此处的submitClient是YARNRunner或LocalJobRunner
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {//作业状态不空,直接返回,否则抛出IOException
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {//抛出无法加载作业的IOException前,调用文件系统FileSystem实例jtFs的delete()方法,
//删除作业提交的相关资源目录或者文件submitJobDir
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
[/code]

至此,MapReduce的Job提交的大体过程就分析完毕!

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: