您的位置:首页 > 其它

MapReduce V1:Job提交流程之JobClient端分析

2015-10-09 00:00 134 查看
我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
MapReduce V1实现中,主要存在3个主要的分布式进程(角色):JobClient、JobTracker和TaskTracker,我们主要是以这三个角色的实际处理活动为主线,并结合源码,分析实际处理流程。下图是《Hadoop权威指南》一书给出的MapReduce V1处理Job的抽象流程图:


如上图,我们展开阴影部分的处理逻辑,详细分析Job提交在JobClient端的具体流程。
在编写好MapReduce程序以后,需要将Job提交给JobTracker,那么我们就需要了解在提交Job的过程中,在JobClient端都做了哪些工作,或者说执行了哪些处理。在JobClient端提交Job的处理流程,如下图所示:

上图所描述的Job的提交流程,说明如下所示:

在MR程序中创建一个Job实例,设置Job状态

创建一个JobClient实例,准备将创建的Job实例提交到JobTracker

在创建JobClient的过程中,首先必须保证建立到JobTracker的RPC连接

基于JobSubmissionProtocol协议远程调用JobTracker获取一个新的Job ID

根据MR程序中配置的Job,在HDFS上创建Job相关目录,并将配置的tmpfiles、tmpjars、tmparchives,以及Job对应jar文件等资源复制到HDFS

根据Job配置的InputFormat,计算该Job输入的Split信息和元数据(SplitMetaInfo)信息,以及计算出map和reduce的个数,最后将这些信息连通Job配置写入到HDFS(保证JobTracker能够读取)

通过JobClient基于JobSubmissionProtocol协议方法submitJob提交Job到JobTracker

MR程序创建Job
下面的MR程序示例代码,已经很熟悉了:
01
public
static
void
main(String[] args)
throws
Exception {
02
Configuration conf =
new
Configuration();
03
String[] otherArgs =
new
GenericOptionsParser(conf, args).getRemainingArgs();
04
if
(otherArgs.length !=
2
) {
05
System.err.println(
"Usage:wordcount <in> <out>"
);
06
System.exit(
2
);
07
}
08
Job job =
new
Job(conf,
"word count"
);
09
job.setJarByClass(WordCount.
class
);
10
job.setMapperClass(TokenizerMapper.
class
);
11
job.setCombinerClass(IntSumReducer.
class
);
12
job.setReducerClass(IntSumReducer.
class
);
13
job.setOutputKeyClass(Text.
class
);
14
job.setOutputValueClass(IntWritable.
class
);
15
FileInputFormat.addInputPath(job,
new
Path(otherArgs[
0
]));
16
FileOutputFormat.setOutputPath(job,
new
Path(otherArgs[
1
]));
17
System.exit(job.waitForCompletion(
true
) ?
0
:
1
);
18
}
在MR程序中,首先创建一个Job,并进行配置,然后通过调用Job的waitForCompletion方法将Job提交到MapReduce集群。这个过程中,Job存在两种状态:Job.JobState.DEFINE和Job.JobState.RUNNING,创建一个Job后,该Job的状态为Job.JobState.DEFINE,Job内部通过JobClient基于org.apache.hadoop.mapred.JobSubmissionProtocol协议提交给JobTracker,然后该Job的状态变为Job.JobState.RUNNING。
Job提交目录submitJobDir
通过如下代码可以看到,Job提交目录是如何创建的:
1
JobConf jobCopy = job;
2
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.
this
, jobCopy);
// 获取到StagingArea目录
3
JobID jobId = jobSubmitClient.getNewJobId();
4
Path submitJobDir =
new
Path(jobStagingArea, jobId.toString());
获取StagingArea目录,JobClient需要通过JobSubmissionProtocol协议的远程方法getStagingAreaDir从JobTracker端获取到,我们看一下JobTracker端的getStagingAreaDirInternal方法,如下所示:
1
private
String getStagingAreaDirInternal(String user)
throws
IOException {
2
final
Path stagingRootDir =
new
Path(conf.get(
"mapreduce.jobtracker.staging.root.dir"
,
"/tmp/hadoop/mapred/staging"
));
3
final
FileSystem fs = stagingRootDir.getFileSystem(conf);
4
return
fs.makeQualified(
new
Path(stagingRootDir, user+
"/.staging"
)).toString();
5
}
最终获取到的StagingArea目录为${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/,例如,如果使用默认的mapreduce.jobtracker.staging.root.dir值,用户为shirdrn,则StagingArea目录/tmp/hadoop/mapred/staging/shirdrn/.staging/。通过Path submitJobDir =new Path(jobStagingArea, jobId.toString());可以得到submitJobDir,假如一个job的ID为job_200912121733_0002,则submitJobDir的值为/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/
拷贝资源文件
在配置Job的时候,可以指定tmpfiles、tmpjars、tmparchives,JobClient会将对应的资源文件拷贝到指定的目录中,对应目录如下代码所示:
1
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
2
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
3
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
4
...
5
Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
6
job.setJar(submitJarFile.toString());
7
fs.copyFromLocalFile(originalJarFile, submitJarFile);
上面已经知道Job提交目录,可以分别得到对应的资源所在目录:

tmpfiles目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files

tmpjars目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars

tmparchives目录:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives

Job Jar文件:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar

然后,就可以将对应的资源文件拷贝到对应的目录中。
计算并存储Split数据
根据Job配置中设置的InputFormat,计算该Job的数据数据文件是如何进行分片的,代码如下所示:
1
Configuration conf =job.getConfiguration();
2
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
3
List<InputSplit> splits = input.getSplits(job);
实际上就是调用InputFormat的getSplits方法,如果不适用Hadoop自带的FileInputFormat的默认getSplits方法实现,可以自定义实现,重写该默认实现逻辑来定义数据数据文件分片的规则。
计算出输入文件的分片信息,然后需要将这些分片数据写入到HDFS供JobTracker查询初始化MapTask,写入分片数据的实现代码:
1
T[] array = (T[]) splits.toArray(
new
InputSplit[splits.size()]);
2
// sort the splits into order based on size, so that the biggest
3
// go first
4
Arrays.sort(array,
new
SplitComparator());
// 根据InputSplit的长度做了一个逆序排序
5
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
// 将split及其元数据信息写入HDFS
接着调用JobSplitWriter.createSplitFiles方法存储Split信息,并创建元数据信息,并保存元数据信息。存储Split信息,代码实现如下所示:
01
SerializationFactory factory =
new
SerializationFactory(conf);
02
int
i =
0
;
03
long
offset = out.getPos();
04
for
(T split:array) {
05
long
prevCount = out.getPos();
06
Text.writeString(out, split.getClass().getName());
07
Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
08
serializer.open(out);
09
serializer.serialize(split);
// 将split序列化写入到HDFS文件中
10
long
currCount = out.getPos();
11
String[] locations = split.getLocations();
12
final
int
max_loc = conf.getInt(MAX_SPLIT_LOCATIONS,
10
);
13
if
(locations.length > max_loc) {
14
LOG.warn(
"Max block location exceeded for split:"
+ split +
" splitsize:"
+ locations.length +
" maxsize:"
+ max_loc);
15
locations = Arrays.copyOf(locations, max_loc);
16
}
17
info[i++] =
new
JobSplit.SplitMetaInfo(locations, offset, split.getLength());
// 创建SplitMetaInfo实例
18
offset += currCount - prevCount;
19
}
我们先看一下FileSplit包含的分片内容,如下所示:
1
private
Path file;
2
private
long
start;
3
private
long
length;
4
private
String[] hosts;
在序列化保存FileSplit到HDFS,可以通过查看FileSplit的write方法,如下所示:
1
@Override
2
public
void
write(DataOutput out)
throws
IOException {
3
Text.writeString(out, file.toString());
4
out.writeLong(start);
5
out.writeLong(length);
6
}
需要注意的是,这里面并没有将FileSplit的hosts信息保存,而是存储到了SplitMetaInfo中new JobSplit.SplitMetaInfo(locations, offset, split.getLength())。
下面是保存SplitMetaInfo信息的实现:
01
private
static
void
writeJobSplitMetaInfo(FileSystem fs, Path filename,
02
FsPermission p,
int
splitMetaInfoVersion,
03
JobSplit.SplitMetaInfo[] allSplitMetaInfo)
throws
IOException {
04
// write the splits meta-info to a file for the job tracker
05
FSDataOutputStream out = FileSystem.create(fs, filename, p);
06
out.write(JobSplit.META_SPLIT_FILE_HEADER);
// 写入META头信息:META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
07
WritableUtils.writeVInt(out, splitMetaInfoVersion);
// META版本信息:1
08
WritableUtils.writeVInt(out, allSplitMetaInfo.length);
// META对象的数量:每个InputSplit对应一个SplitMetaInfo
09
for
(JobSplit.SplitMetaInfo splitMetaInfo :allSplitMetaInfo) {
10
splitMetaInfo.write(out);
// 每个都进行存储
11
}
12
out.close();
13
}
看一下SplitMetaInfo存储时包含的数据信息:
1
public
void
write(DataOutput out)
throws
IOException {
2
WritableUtils.writeVInt(out, locations.length);
// location个数
3
for
(
int
i =
0
; i < locations.length; i++) {
4
Text.writeString(out, locations[i]);
// 写入每一个location位置信息
5
}
6
WritableUtils.writeVLong(out, startOffset);
// 偏移量
7
WritableUtils.writeVLong(out, inputDataLength);
// 数据长度
8
}
最后,我们看一下这些数据保存的目录和文件情况。前面已经知道Job提交目录,下面看split存储的文件是如何构建的:
1
FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
2
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
那么split保存的文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split。
同样,split元数据信息文件构建如下所示:
1
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
2
new
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);
split元数据信息文件为:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo。
保存Job配置数据
在提交Job到JobTracker之前,还需要保存Job的配置信息,这些配置数据根据用户在MR程序中配置,覆盖默认的配置值,最后保存到XML文件(job.xml)到HDFS,供JobTracker查询。如下代码,创建submitJobFile文件并写入job配置数据:
01
...
02
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
03
jobCopy.set(
"mapreduce.job.dir"
, submitJobDir.toString());
04
...
05
// Write job file to JobTracker's fs
06
FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
new
FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
07
...
08
try
{
09
jobCopy.writeXml(out);
10
}
finally
{
11
out.close();
12
}
前面已经知道Job提交目录,我们很容易就能得到job.xml文件的存储路径:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml。
最后,所有的数据都已经准备完成,JobClient就可以基于JobSubmissionProtocol协议方法submitJob,提交Job到JobTracker运行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: