您的位置:首页 > 运维架构

【hadoop】 3007-hadoop的提交job的源码分析

2015-04-03 22:55 330 查看
1、复习RPC通信框架

rpc 远程过程调用
2、客户端提交job给MR的集群执行过程
2.1、连接ResourceManager,通过执行Job中connect()来完成
第一步:在job中执行this.cluster = new Cluster();执行结果返回job



第二步:在Cluster类中ClientProtocol
client = null; 引用。获取client = new YARNRunner();





第三步:YARNRunner 有ResourceMgrDelegate对象




第四步:通过以上三步job获取了ResourceManager的代理对象,可以通过该代理对象和ResourseManager通信,完成客户端和服务端连接



2.2、连接ResourceManager,通过执行Job中的一下代码完成
final JobSubmitter submitter =
getJobSubmitter( cluster.getFileSystem(), cluster .getClient())



2.3、提交器submitter提交,调用submitter .submitJobInternal(Job.this, cluster)
①向ResourceManager申请jobID 和 Job资源提交路径。代码实现
//申请jobID,通过submitClient 是YARNRunner来生成jobID
JobID jobId = submitClient .getNewJobID();

public JobID getNewJobID() throws IOException, InterruptedException {

return resMgrDelegate.getNewJobID();

}

//生成job提交资源路径,例如:/tmp/hadoop-yarn/staging/hadoop/.staging/job_1425021794376_0005,该路径为hdfs文件路径。通过YARNRunner来生成该路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());

②提交jar包,配置文件到hdfs上

在JobSubmitter类中执行如下代码
// 拷贝jar文件到hdfs上
copyAndConfigureFiles (job , submitJobDir);
// 配置文件写到hdfs上
writeConf( conf, submitJobFile );
// YARNRunner调用submitJob 提交给ResourceManager
status = submitClient.submitJob( jobId, submitJobDir .toString(), job.getCredentials());
// 提交job后,返回结果信息
clientCache .getClient(jobId).getJobStatus( jobId);
private void copyAndConfigureFiles(Job job, Path submitJobDir,

short replication) throws IOException {

Configuration conf = job.getConfiguration();
。。。。。。。。。

String jobJar = job.getJar();

Path jobJarPath = new Path(jobJar);

copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),

replication);

。。。。。。。。。
}

private void writeConf(Configuration conf, Path jobFile )

throws IOException {

// Write job file to fs

FSDataOutputStream out =

FileSystem .create( jtFs , jobFile , new FsPermission(JobSubmissionFiles. JOB_FILE_PERMISSION ));

try {

conf .writeXml(out );

} finally {

out .close();

}

}

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)

throws IOException, InterruptedException {





// Construct necessary information to start the MR AM

ApplicationSubmissionContext appContext =

createApplicationSubmissionContext(conf, jobSubmitDir, ts);

// Submit to ResourceManager

try {

ApplicationId applicationId =

resMgrDelegate.submitApplication(appContext);

}catch(Exception e){}

}

public ApplicationId submitApplication(ApplicationSubmissionContext appContext )

throws YarnException, IOException {
//client:YarnClientImpl,下面执行过程是服务端执行的,若要查看具体的执行过程,需采用远程debug

return client.submitApplication( appContext );

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: