您的位置:首页 > 运维架构

Hadoop源码解析之如何获取JobId

2017-01-23 23:24 841 查看
前一篇文章讲到客户端作业的提交流程,其中有调用到JobSubmitter类的submitJobInternal函数,里面有获取JobId的地方。

部分源码如下:

JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {

//validate the jobs output specs
checkSpecs(job);

Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();//获取新的JobID,此处需要RPC调用
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}

copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);

if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}

// Write job file to submit dir
writeConf(conf, submitJobFile);

//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
}
}


获取JobID的代码为 JobID jobId = submitClient.getNewJobID(); 这里的submitClient为YarnRunner,调用它的getNewJobID()会调用它内部的resMgrDelegate.getNewJobID():

public JobID getNewJobID() throws IOException, InterruptedException {
return resMgrDelegate.getNewJobID();
}


调用到ResourceMgrDelegate类中:

public JobID getNewJobID() throws IOException, InterruptedException {
try {
this.application = client.createApplication().getApplicationSubmissionContext();
this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
} catch (YarnException e) {
throw new IOException(e);
}
}


ResourceMgrDelegate中的client为YarnClientImpl,

在YarnClientImpl类中createApplication():

public YarnClientApplication createApplication()
throws YarnException, IOException {
ApplicationSubmissionContext context = Records.newRecord
(ApplicationSubmissionContext.class);
GetNewApplicationResponse newApp = getNewApplication();
ApplicationId appId = newApp.getApplicationId();
context.setApplicationId(appId);
return new YarnClientApplication(newApp, context);
}


其中,getNewApplication的实现如下:

private GetNewApplicationResponse getNewApplication()
throws YarnException, IOException {
GetNewApplicationRequest request =
Records.newRecord(GetNewApplicationRequest.class);
return rmClient.getNewApplication(request);
}


该函数返回的newApp对象中就包含RM分配的ApplicationId,

createApplication()函数中的context对象的实现是ApplicationSubmissionContextPBImpl,

context.setApplicationId(appId)调用的是:ApplicationSubmissionContextPBImpl类中的:

public void setApplicationId(ApplicationId applicationId) {
maybeInitBuilder();
if (applicationId == null)
builder.clearApplicationId();
this.applicationId = applicationId;
}

结束之后,回到return new YarnClientApplication(newApp, context);

返回对象YarnClientApplication,其构造函数中,将context对象赋值给appSubmissionContext:

public YarnClientApplication(GetNewApplicationResponse newAppResponse,
ApplicationSubmissionContext appContext) {
this.newAppResponse = newAppResponse;
this.appSubmissionContext = appContext;
}


再回到this.application = client.createApplication().getApplicationSubmissionContext();的getApplicationSubmissionContext

返回的就是上一步初始化的context对象。

public ApplicationSubmissionContext getApplicationSubmissionContext() {
return appSubmissionContext;
}


此时,this.application = ApplicationSubmissionContext类的context对象。

走到this.application
4000
Id = this.application.getApplicationId();

ApplicationSubmissionContext类的实现是ApplicationSubmissionContextPBImpl:

调用ApplicationSubmissionContextPBImpl类的getApplicationId方法:

public ApplicationId getApplicationId() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.applicationId != null) {
return applicationId;
} // Else via proto
if (!p.hasApplicationId()) {
return null;
}
applicationId = convertFromProtoFormat(p.getApplicationId());
return applicationId;
}


最后得到了applicationId。

总结:

获取JobID的大致流程如下

1、提交作业的客户端YarnRunner调用getNewJobID方法,内部调用ResourceMgrDelegate的getNewJobID

2、ResourceMgrDelegate调用内部成员client(实际上是YarnClientImpl)的CreateApplication方法创建一个YarnClientApplication

3、YarnClientApplication创建流程为:
3.1 构造一个 ApplicationSubmissionContext context对象,

3.2 调用getNewApplication函数,构造一个GetNewApplicationRequest request ,

3.3 调用 rmClient.getNewApplication(request)获得一个GetNewApplicationResponse newApp对象,newApp中则包含了ResourceManager分配的ApplicationId。

4.调用context的setApplicationId设置ApplicationId,将ResourceMgrDelegate的内部成员Application设置为context。

5.将ResourceMgrDelegate的内部成员ApplicationId设置为context的ApplicationId。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息