【hadoop】 3007-hadoop的提交job的源码分析
2015-04-03 22:55
330 查看
1、复习RPC通信框架
rpc 远程过程调用
2、客户端提交job给MR的集群执行过程
2.1、连接ResourceManager,通过执行Job中connect()来完成
第一步:在job中执行this.cluster = new Cluster();执行结果返回job
第二步:在Cluster类中ClientProtocol
client = null; 引用。获取client = new YARNRunner();
第三步:YARNRunner 有ResourceMgrDelegate对象
第四步:通过以上三步job获取了ResourceManager的代理对象,可以通过该代理对象和ResourseManager通信,完成客户端和服务端连接
2.2、连接ResourceManager,通过执行Job中的一下代码完成
final JobSubmitter submitter =
getJobSubmitter( cluster.getFileSystem(), cluster .getClient())
2.3、提交器submitter提交,调用submitter .submitJobInternal(Job.this, cluster)
①向ResourceManager申请jobID 和 Job资源提交路径。代码实现
//申请jobID,通过submitClient 是YARNRunner来生成jobID
JobID jobId = submitClient .getNewJobID();
public JobID getNewJobID() throws IOException, InterruptedException {
return resMgrDelegate.getNewJobID();
}
//生成job提交资源路径,例如:/tmp/hadoop-yarn/staging/hadoop/.staging/job_1425021794376_0005,该路径为hdfs文件路径。通过YARNRunner来生成该路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
②提交jar包,配置文件到hdfs上
在JobSubmitter类中执行如下代码
// 拷贝jar文件到hdfs上
copyAndConfigureFiles (job , submitJobDir);
// 配置文件写到hdfs上
writeConf( conf, submitJobFile );
// YARNRunner调用submitJob 提交给ResourceManager
status = submitClient.submitJob( jobId, submitJobDir .toString(), job.getCredentials());
// 提交job后,返回结果信息
clientCache .getClient(jobId).getJobStatus( jobId);
private void copyAndConfigureFiles(Job job, Path submitJobDir,
short replication) throws IOException {
Configuration conf = job.getConfiguration();
。。。。。。。。。
String jobJar = job.getJar();
Path jobJarPath = new Path(jobJar);
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication);
。。。。。。。。。
}
private void writeConf(Configuration conf, Path jobFile )
throws IOException {
// Write job file to fs
FSDataOutputStream out =
FileSystem .create( jtFs , jobFile , new FsPermission(JobSubmissionFiles. JOB_FILE_PERMISSION ));
try {
conf .writeXml(out );
} finally {
out .close();
}
}
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
}catch(Exception e){}
}
public ApplicationId submitApplication(ApplicationSubmissionContext appContext )
throws YarnException, IOException {
//client:YarnClientImpl,下面执行过程是服务端执行的,若要查看具体的执行过程,需采用远程debug
return client.submitApplication( appContext );
}
rpc 远程过程调用
2、客户端提交job给MR的集群执行过程
2.1、连接ResourceManager,通过执行Job中connect()来完成
第一步:在job中执行this.cluster = new Cluster();执行结果返回job
第二步:在Cluster类中ClientProtocol
client = null; 引用。获取client = new YARNRunner();
第三步:YARNRunner 有ResourceMgrDelegate对象
第四步:通过以上三步job获取了ResourceManager的代理对象,可以通过该代理对象和ResourseManager通信,完成客户端和服务端连接
2.2、连接ResourceManager,通过执行Job中的一下代码完成
final JobSubmitter submitter =
getJobSubmitter( cluster.getFileSystem(), cluster .getClient())
2.3、提交器submitter提交,调用submitter .submitJobInternal(Job.this, cluster)
①向ResourceManager申请jobID 和 Job资源提交路径。代码实现
//申请jobID,通过submitClient 是YARNRunner来生成jobID
JobID jobId = submitClient .getNewJobID();
public JobID getNewJobID() throws IOException, InterruptedException {
return resMgrDelegate.getNewJobID();
}
//生成job提交资源路径,例如:/tmp/hadoop-yarn/staging/hadoop/.staging/job_1425021794376_0005,该路径为hdfs文件路径。通过YARNRunner来生成该路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
②提交jar包,配置文件到hdfs上
在JobSubmitter类中执行如下代码
// 拷贝jar文件到hdfs上
copyAndConfigureFiles (job , submitJobDir);
// 配置文件写到hdfs上
writeConf( conf, submitJobFile );
// YARNRunner调用submitJob 提交给ResourceManager
status = submitClient.submitJob( jobId, submitJobDir .toString(), job.getCredentials());
// 提交job后,返回结果信息
clientCache .getClient(jobId).getJobStatus( jobId);
private void copyAndConfigureFiles(Job job, Path submitJobDir,
short replication) throws IOException {
Configuration conf = job.getConfiguration();
。。。。。。。。。
String jobJar = job.getJar();
Path jobJarPath = new Path(jobJar);
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication);
。。。。。。。。。
}
private void writeConf(Configuration conf, Path jobFile )
throws IOException {
// Write job file to fs
FSDataOutputStream out =
FileSystem .create( jtFs , jobFile , new FsPermission(JobSubmissionFiles. JOB_FILE_PERMISSION ));
try {
conf .writeXml(out );
} finally {
out .close();
}
}
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
}catch(Exception e){}
}
public ApplicationId submitApplication(ApplicationSubmissionContext appContext )
throws YarnException, IOException {
//client:YarnClientImpl,下面执行过程是服务端执行的,若要查看具体的执行过程,需采用远程debug
return client.submitApplication( appContext );
}
相关文章推荐
- Hadoop源码分析1: 客户端提交JOB
- Hadoop-1.2.1学习之Job创建和提交源码分析
- Hadoop源码分析--MapReduce作业(job)提交源码跟踪
- hadoop中job提交的源码分析
- hadoop 源码分析(一) jobClient 提交到JobTracker
- Hadoop源码分析23:MapReduce的Job提交过程
- Hadoop2.*源码分析之Job任务提交与执行
- Hadoop源码分析(三)--------------job提交过程分析(3)之job的split过程
- hadoop的job提交的源码分析
- Hadoop提交Job Client端源码分析
- hadoop yarn的job提交流程源码分析
- Spark源码分析之Job提交运行总流程概述
- Apache Spark源码分析-- Job的提交与运行
- Hadoop作业提交与执行源码分析
- Hadoop源码分析26 JobTracker主要容器和线程
- Hadoop之JobTracker源码分析
- Hadoop源码分析38 JobClient的线程协作
- hadoop1.x作业提交过程分析(源码分析第二篇) 推荐
- Hadoop源码分析27 JobTracker空载处理心跳
- Hadoop0.21.0源码流程分析(1)-客户端提交作业