您的位置:首页 > 运维架构

Hadoop源码分析23:MapReduce的Job提交过程

2014-05-28 08:48 197 查看
命令为:

hadoop_debugjar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount/user/admin/in/yellow.txt /user/admin/out/555

首先调用org.apache.hadoop.util.runJar.main

public static void
main
(String[]args){

//
加载Jar包

/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar

JarFile jarFile =
new
JarFile(fileName);

//根据META-INF得知主Class为org/apache/hadoop/examples/ExampleDriver

Manifest manifest = jarFile.getManifest();

if
(manifest !=null){

mainClassName =manifest.getMainAttributes().getValue("Main-Class");

}

//建立本地临时文件夹

/tmp/hadoop-admin

File tmpDir =
newFile(newConfiguration().get("hadoop.tmp.dir"));
tmpDir.mkdirs();

//建立本地工作文件夹
/tmp/hadoop-admin/hadoop-unjar4705742737164408087 finalFile
workDir = File.createTempFile("hadoop-unjar",
"", tmpDir);

workDir.delete();
workDir.mkdirs();

//JVM退出时将tmp/hadoop-admin/hadoop-unjar4705742737164408087删除

Runtime.getRuntime().addShutdownHook(newThread(){

publicvoidrun(){

try{

FileUtil.fullyDelete(workDir);

}
catch(IOExceptione) {

}

}
});

//将Jar包解压到/tmp/hadoop-admin/hadoop-unjar4705742737164408087

unJar(file, workDir);

//将/tmp/hadoop-admin/hadoop-unjar4705742737164408087,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/classes/,/tmp/hadoop-admin/hadoop-unjar4705742737164408087/lib全部添加到classpath

classPath.add(newFile(workDir+"/").toURL());

classPath.add(file.toURL());

classPath.add(newFile(workDir,"classes/").toURL());

File[] libs =
newFile(workDir,"lib").listFiles();

if(libs!=
null){

for(inti
= 0;i <libs.length;i++)
{

classPath.add(libs[i].toURL());

}

}

//运行主函数
main.invoke(null,newObject[]{
newArgs });

}

设置属性:

job.setJarByClass(WordCount.class); //mapred.jar

job.setMapperClass(WordCountMap.class); //mapreduce.map.class

job.setReducerClass(WordCountReduce.class); //mapreduce.reduce.class

job.setCombinerClass(WordCountReduce.class);//mapreduce.combine.class

job.setMapOutputKeyClass(Text.class); //mapred.mapoutput.key.class

job.setMapOutputValueClass(IntWritable.class);//mapred.mapoutput.value.class

job.setOutputKeyClass(Text.class); //mapred.output.key.class

job.setOutputValueClass(IntWritable.class); //mapred.output.value.class

job.setJobName("WordCount"); //mapred.job.name

FileInputFormat.addInputPath(job,input); //mapred.input.dir

FileOutputFormat.setOutputPath(job,output);
//mapred.output.dir

job.submit()

publicvoidsubmit()throwsIOException,InterruptedException,

ClassNotFoundException {

......

// Connect tothe JobTracker and submit the job

connect();

info=jobClient.submitJobInternal(conf);

......

}

连接JobTracker:

privatevoidconnect()throwsIOException,InterruptedException
{

......

jobClient=newJobClient((JobConf)getConfiguration());

......

}

其中:

publicJobClient(JobConfconf)
throwsIOException{

......

init(conf);
}

publicvoidinit(JobConfconf)
throwsIOException{

......

this.jobSubmitClient=createRPCProxy(JobTracker.getAddress(conf),conf);
}

privatestaticJobSubmissionProtocolcreateRPCProxy(InetSocketAddress
addr,

Configuration conf)
throwsIOException{

return(JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class,

JobSubmissionProtocol.versionID,addr,

UserGroupInformation.getCurrentUser(), conf,

NetUtils.getSocketFactory(conf,JobSubmissionProtocol.class));
}

此时获得一个实现JobSubmissionProtocol的RPC调用,即JobTracker的代理。

获取job StagingArea

PathjobStagingArea =JobSubmissionFiles.getStagingDir(JobClient.this,
jobCopy);

RPC请求:JobSubmissionProtocol.getStagingAreaDir()

返回:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)

返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@5521691b,即存在

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging)

返回:org.apache.hadoop.hdfs.protocol.HdfsFileStatus@726c554,用以判断权限

获得
NewJobId


JobIDjobId =

jobSubmitClient.getNewJobId();

RPC请求:JobSubmissionProtocol.getNewJobId()

返回:job_201404010621_0004

建立
submitJob Dir:


PathsubmitJobDir =
new
Path(jobStagingArea,jobId.toString());

hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004

复制Jar到HDFS

copyAndConfigureFiles(jobCopy,submitJobDir);

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004)

返回:null

RPC请求:ClientProtocol.mkdirs(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwxr-xr-x)

返回:true

RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,rwx------)

返回:null

RPC请求:ClientProtocol.getFileInfo(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar)

返回:null,即不存在

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rwxr-xr-x,
DFSClient_-1317833261, true, true, 3,67108864)

返回:输出流

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261,
null)

返回:org.apache.hadoop.hdfs.protocol.LocatedBlock@1a9b701

Block:blk_6689254996395759186_2720

BlockToken:Ident: ,Pass: , Kind:
, Service:

DataNode:[10.1.1.103:50010,10.1.1.102:50010]

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,DFSClient_-1317833261)

返回:true

RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,10)

返回:true

RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.jar,rw-r--r--)

返回:null

RPC请求:ClientProtocol.renewLease(DFSClient_-1317833261)

返回:null

此后有1个守护线程会不断发送
renewLease请求

此时本地文件/opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar被复制到HDFS文件系统/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml

Reduce数目:

int
reduces= jobCopy.getNumReduceTasks();

reduce数目为2

检查输出目录

RPC请求:ClientProtocol.getFileInfo(/user/admin/out/555)

返回:null,即不存在

获取输入分片信息:

int
maps =writeSplits(context, submitJobDir);

其中:

private<T
extendsInputSplit> intwriteNewSplits(JobContextjob,
Path jobSubmitDir) throwsIOException,

InterruptedException, ClassNotFoundException {

Configuration conf = job.getConfiguration();

InputFormat<?, ?>
input =

ReflectionUtils.newInstance(job.getInputFormatClass(),conf);

List<InputSplit>
splits =input.getSplits(job);


T[] array = (T[]) splits.toArray(newInputSplit[splits.size()]);

// sort thesplits into order based on size, so that the biggest

// gofirst

Arrays.sort(array,
newSplitComparator());

JobSplitWriter.createSplitFiles(jobSubmitDir,conf,

jobSubmitDir.getFileSystem(conf), array);

returnarray.length;

}

其中:

publicList<InputSplit>getSplits(JobContextjob

)
throwsIOException{

...........
}

RPC请求:ClientProtocol.getFileInfo(/user/admin/in/yellow.txt)

返回:path="hdfs://server1:9000/user/admin/in/yellow.txt",length=201000000,isdir=false,block_replication=3,
blocksize=67108864,
permission=rw-r--r--,owner=Admin,
group=supergroup

RPC请求:ClientProtocol.getBlockLocations(/user/admin/in/yellow.txt,0,
201000000)

返回:3个BlockLocation

offset={0}, length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]},
topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

offset={67108864}, length={67108864}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]},
topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

offset={134217728},length={66782272}, hosts={server3,server2}, names={[10.1.1.102:50010, 10.1.1.103:50010]},topologyPaths={[/default-rack/10.1.1.103:50010,/default-rack/10.1.1.102:50010]}

最终确定的分片信息
为3个Filespit


Filespit:
file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={0}

Filespit:
file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 67108864 },start={67108864}

Filespit:
file={hdfs://server1:9000/user/admin/in/yellow.txt},hosts={ [server3, server2] }, length={ 66782272}, start={134217728}

map数量为3

jobCopy.setNumMapTasks(maps);

建立分片文件:

JobSplitWriter.createSplitFiles(jobSubmitDir,conf,
jobSubmitDir.getFileSystem(conf), array);

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rwxr-xr-x,
DFSClient_-1317833261, true, true, 3,67108864);

返回:输出流

RPC请求:ClientProtocolsetPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,rw-r--r--)

返回:null

RPC请求:ClientProtocol.setReplication(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,10)

返回:true

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261,
null)

返回:LocatedBlock对象为

Block:
blockid=-921399365952861077,generationStamp=2714,numBytes=0

BlockTokenIdentifier:Ident: ,Pass: ,
Kind: , Service:

DatanodeInfo[]:[10.1.1.103:50010,10.1.1.102:50010]

offset0

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.split,DFSClient_-1317833261)

返回:true

写入的
SplitMetaInfo为


[data-size :67108864 start-offset : 7 locations : server3 server2]

[data-size :67108864 start-offset : 116 locations: server2 server3]

[data-size :66782272 start-offset : 225 locations : server2 server3 ]

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rwxr-xr-x,
DFSClient_-1317833261, true, true, 3,67108864)

返回:输出流

RPC请求:
ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,rw-r--r--)

返回:null

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261,
null)

返回:LocatedBlock对象为

Block:
blockid=789965327875207186,generationStamp=2715,numBytes=0

BlockTokenIdentifier:Ident: ,Pass: ,
Kind: , Service:

DatanodeInfo[]:[10.1.1.103:50010,10.1.1.102:50010]

offset0

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.splitmetainfo,DFSClient_-1317833261)

返回:true

设置AccessControl

RPC请求:JobSubmissionProtocol.getQueueAdmins(default)

返回:All usersare allowed

Write jobfile to JobTracker'sfs

RPC请求:ClientProtocol.create(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rwxr-xr-x,
DFSClient_-1317833261, true, true, 3,67108864)

返回:输出流

RPC请求:ClientProtocol.setPermission(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,rw-r--r--)

返回:null

RPC请求:ClientProtocol.addBlock(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261,null)

返回:LocatedBlock对象为

Block:
blockid=
-7725157033540829125,generationStamp= 2716,numBytes=0

BlockTokenIdentifier:Ident: ,Pass: ,
Kind: , Service:

DatanodeInfo[]:[10.1.1.103:50010,10.1.1.102:50010]

offset0

RPC请求:ClientProtocol.complete(/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,DFSClient_-1317833261)

返回:true

此时"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下生成文件
job.xml,包含了所有的配置信息.

此时HDFS目录"/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/"下面文件为:

-rw-r--r-- 10 admin supergroup 142465 2014-04-08 00:20 job.jar

-rw-r--r-- 10 admin supergroup 334 2014-04-08 00:45 job.split

-rw-r--r-- 3 admin supergroup 80 2014-04-08 00:50 job.splitmetainfo

-rw-r--r-- 3 admin supergroup 20416 2014-04-08 00:55job.xml

job.jar 为运行的Jar包, job.split内容
为(FileSplit 对象), job.splitmetainfo内容
为(SplitMetaInfo对象),job.xml
为job的配置文件


提交作业:

status=
jobSubmitClient.submitJob(
jobId, submitJobDir.toString(),jobCopy.getCredentials());

RPC请求:JobSubmissionProtocol.submitJob(job_201404010621_0004,hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004,org.apache.hadoop.security.Credentials@70677770)

返回:JobStatus:
setProgress=0,mapProgress=0,reduceProgress=0,cleanProgress=0,runstate=4,priority=NOMAL,..

RPC请求:JobSubmissionProtocol.getJobProfile(job_201404010621_0004)

返回:JobProfile:jobFile=hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/Admin/.staging/job_201404010621_0004/job.xml,jobID=job_201404010621_0004,name=WordCount,queue=default,url=http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004,user=Admin

综合JobStatus和JobProfile

Job:job_201404010621_0004

file:hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404010621_0004/job.xml

tracking URL:http://server1:50030/jobdetails.jsp?jobid=job_201404010621_0004

map()completion: 0.0

reduce()completion: 0.0

监控Job状态:

jobClient.monitorAndPrintJob(conf,info);

RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)

返回:
setProgress=1,mapProgress=1,reduceProgress=0.22222224,cleanProgress=1,runstate=1,priority=NOMAL

RPC请求:JobSubmissionProtocol.getJobStatus(job_201404010621_0004)

返回:
setProgress=1,mapProgress=1,reduceProgress=1,cleanProgress=1,runstate=2,priority=NOMAL,

即map 100%reduce 100%

之后会多次发送JobSubmissionProtocol.getJobStatus(job_201404010621_0004)请求

RPC请求:JobSubmissionProtocol.getTaskCompletionEvents(job_201404010621_0004,0,
10)

返回:
[Task Id :attempt_201404010621_0004_m_000004_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000002_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000001_0, Status :
SUCCEEDED, Task Id :attempt_201404010621_0004_m_000000_1, Status : KILLED, Task Id :attempt_201404010621_0004_r_000000_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_r_000001_0, Status : SUCCEEDED, Task Id :attempt_201404010621_0004_m_000003_0,
Status :SUCCEEDED]

RPC请求:JobSubmissionProtocol.getJobCounters(job_201404010621_0004)

返回:OW[class=classorg.apache.hadoop.mapred.Counters,value=Counters: 29

Job Counters

Launched reduce tasks=2

SLOTS_MILLIS_MAPS=293879

Total time spent by all reduces waiting after reserving slots(ms)=0

Total time spent by all maps waiting after reserving slots(ms)=0

Launched map tasks=4

Data-local map tasks=4

SLOTS_MILLIS_REDUCES=74342

File Output Format Counters

Bytes Written=933

FileSystemCounters

FILE_BYTES_READ=316152

HDFS_BYTES_READ=201008521

FILE_BYTES_WRITTEN=370366

HDFS_BYTES_WRITTEN=933

File Input Format Counters

Bytes Read=201008194

Map-Reduce Framework

Map output materialized bytes=2574

Map input records=15600000

Reduce shuffle bytes=2574

Spilled Records=23025

Map output bytes=356000000

Total committed heap usage (bytes)=378023936

CPU time spent (ms)=158350

Combine input records=41011850

SPLIT_RAW_BYTES=327

Reduce input records=225

Reduce input groups=75

Combine output records=12075

Physical memory (bytes) snapshot=650371072

Reduce output records=75

Virtual memory (bytes) snapshot=5300277248

Map output records=41000000]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: