您的位置:首页 > 运维架构

Hadoop提交Job Client端源码分析

2015-12-17 15:03 423 查看
在之前分析了hadoop执行jar的流程分析(博客链接http://blog.csdn.net/a822631129/article/details/50310903),分析到了执行用户写的mapreduce程序,本文分析mapreduce程序中hadoop client端是如何提交job的。

主要涉及的五个java类文件:

hadoop-mapreduce-client-core下的包org.apache.hadoop.mapreduce:

Job.java、JobSubmitter.java

hadoop-mapreduce-client-jobclient下的包org.apache.hadoop.mapred: 

YARNRunner.java、ResourceMgrDelegate.java

hadoop-yarn-project下的包org.apache.hadoop.yarn.client.api.impl:
YarnClientImpl.java

1.hadoop wordcount程序:

public class WordCount {

public static class WordCountMap extends
Mapper<LongWritable, Text, Text, IntWritable> {

private final IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}

public static class WordCountReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}

2.提交程序调用了Job中的waitForCompletion()函数
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}if判断state == JobState.DEFINE中变量state已初始化为JobState.DEFINE,所以执行submit提交Job,在下步中详细分析submit函数。

verbose为true,monitorAndPrintJob监测job运行情况并打印相应信息,不详细分析;若verbose为false,自身进入循环,以一定的时间间隔轮询检查所提交的Job是是否执行完成。如果执行完成,跳出循环,调用isSuccessful()函数返回执行后的状态。
3.waitForCompletion()中的submit()函数

/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}ensureState(JobState.DEFINE)校验job状态;

 setUseNewAPI()设定一些api(mapred.input.format.class、mapred.partitioner.class、mapred.output.format.class等)的使用,默认使用使用hadoop2中api;

 connect()获取需的调用协议(ClientProtocol)信息,连接信息,最后写入Cluster对象中;

 然后调用JobSubmitter类下的submitJobInternal()函数,下步详细分析;

 将state设为RUNNING。

4.JobSubmitter类下的submitJobInternal()函数

/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
* <li>
* Checking the input and output specifications of the job.
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
* </li>
* <li>
* Setup the requisite accounting information for the
* {@link DistributedCache} of the job, if necessary.
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system
* directory on the distributed file-system.
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally
* monitoring it's status.
* </li>
* </ol></p>
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {

//validate the jobs output specs
checkSpecs(job);

Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}

copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);

if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}

// Write job file to submit dir
writeConf(conf, submitJobFile);

//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
}
}检验输出参数,获取配置信息和提交Job主机的地址,确定jobId,确定job submit目录,设置一些参数

生成密钥用于shuffle传输认证

拷贝所需的files,libjars,archives,jobJar(wordcount程序jar包)

job进行分片,并获取分片数量,用于确定map的数量

设置job提交队列

将配置写到job submit目录

调用YARNRunner类下的submitJob()函数,提交Job,传入相应参数(JobID,job submit目录,Credentials)。

等待submit()执行返回Job执行状态,最后删除相应的工作目录。

5.YARNRunner类下的submitJob()函数

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {

addHistoryToken(ts);

// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);

// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);

ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}初始化Application上下文信息,上下文信息包括MRAppMaster所需要的内存、CPU,jobJar,jobConf,数据split,执行MRAppMaster的命令

然后调用ResourceMgrDelegate的submitApplication()方法将application提交到ResourceManager,同时传入Application上下文信息,提交Job到ResourceManager,函数执行最后返回已生成的ApplicationId(实际生成JobID的时候ApplicationId就已经生成)。

最后返回Job此时的状态

6.ResourceMgrDelegate类下的submitApplication()函数:

public ApplicationId

      submitApplication(ApplicationSubmissionContext appContext)

          throws YarnException, IOException {

    return client.submitApplication(appContext);

  }

client.submitApplication(appContext);client对象是YarnClient,找到YarnClient的实现YarnClientImpl中的submitApplication方法

YarnClientImpl中的submitApplication()函数:

设置ApplicationId 

封装提交Application请求,将上下文信息设置进去。

增加安全权限认证一些东西。

rmClient.submitApplication 用Hadoop RPC远程调用ResourcesManager端的ClientRMService类下的submitApplication()方法

定时获取Application状态,当Application状态为NEW或NEW_SAVING时,Application提交成功,或是在限定时间内一直没有提交成功就报超时错误。若是获取不到Application信息,就再一次用RPC远程调用提交Application。

public ApplicationId submitApplication(ApplicationSubmissionContext appContext)

          throws YarnException, IOException {

    ApplicationId applicationId = appContext.getApplicationId();

    if (applicationId == null) {

      throw new ApplicationIdNotProvidedException(

          "ApplicationId is not provided in ApplicationSubmissionContext");

    }

    SubmitApplicationRequest request =

        Records.newRecord(SubmitApplicationRequest.class);

    request.setApplicationSubmissionContext(appContext);

    // Automatically add the timeline DT into the CLC

    // Only when the security and the timeline service are both enabled

    if (isSecurityEnabled() && timelineServiceEnabled) {

      addTimelineDelegationToken(appContext.getAMContainerSpec());

    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.

    rmClient.submitApplication(request);

    int pollCount = 0;

    long startTime = System.currentTimeMillis();

    while (true) {

      try {

        YarnApplicationState state =

            getApplicationReport(applicationId).getYarnApplicationState();

        if (!state.equals(YarnApplicationState.NEW) &&

            !state.equals(YarnApplicationState.NEW_SAVING)) {

          LOG.info("Submitted application " + applicationId);

          break;

        }

        long elapsedMillis = System.currentTimeMillis() - startTime;

        if (enforceAsyncAPITimeout() &&

            elapsedMillis >= asyncApiPollTimeoutMillis) {

          throw new YarnException("Timed out while waiting for application " +

              applicationId + " to be submitted successfully");

        }

        // Notify the client through the log every 10 poll, in case the client

        // is blocked here too long.

        if (++pollCount % 10 == 0) {

          LOG.info("Application submission is not finished, " +

              "submitted application " + applicationId +

              " is still in " + state);

        }

        try {

          Thread.sleep(submitPollIntervalMillis);

        } catch (InterruptedException ie) {

          LOG.error("Interrupted while waiting for application "

              + applicationId

              + " to be successfully submitted.");

        }

      } catch (ApplicationNotFoundException ex) {

        // FailOver or RM restart happens before RMStateStore saves

        // ApplicationState

        LOG.info("Re-submit application " + applicationId + "with the " +

            "same ApplicationSubmissionContext");

        rmClient.submitApplication(request);

      }

    }

    return applicationId;

  }

7.至此Job已提交到ResourceManager,提交Job Client端工作已经完成,server端就复杂了,在以后的博客里再做分析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息