您的位置:首页 > 其它

MapReduce执行过程源码分析(一)——Job任务的提交

2014-02-20 10:19 871 查看
为了能使源码的执行过程与Hadoop权威指南(2、3版)中章节Shuffle and Sort的分析相对应,Hadoop的版本为0.20.2。

一般情况下我们通过Job(org.apache.hadoop.mapreduce.Job)的方法waitForCompletion来开始一个Job的执行。

/**
* Submit the job to the cluster and wait for it to finish.
*
* @param verbose
*            print the progress to the user
* @return true if the job succeeded
* @throws IOException
*             thrown if the communication with the <code>JobTracker</code>
*             is lost
*/
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}

if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}

return isSuccessful();
}


通常设置方法参数verbose为true,这样可以在控制台中看到Job的执行过程信息。

其中Job的具体提交过程是由方法submit完成的,

/**
* Submit the job to the cluster and return immediately.
*
* @throws IOException
*/
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);

setUseNewAPI();

info = jobClient.submitJobInternal(conf);

state = JobState.RUNNING;
}


而submit方法的执行又依赖于JobClient submitJobInternal来完成,方法submitJobInternal是Job任务提交过程中的重点,在方法中完成的Job任务的初始化准备工作。

/**
* Internal method for submitting jobs to the system.
*
* @param job
*            the configuration to submit
* @return a proxy object for the running job
* @throws FileNotFoundException
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
public RunningJob submitJobInternal(JobConf job)
throws FileNotFoundException, ClassNotFoundException,
InterruptedException, IOException {
/*
* configure the command line options correctly on the submitting dfs
*/
JobID jobId = jobSubmitClient.getNewJobId();

/*
* 在submitJobDir目录下有三个文件:job.jar、job.split、job.xml
*
* **********************************************************************
*/
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");

configureCommandLineOptions(job, submitJobDir, submitJarFile);

Path submitJobFile = new Path(submitJobDir, "job.xml");

/*
* 获取reducer的数目
*
* **********************************************************************
*/
int reduces = job.getNumReduceTasks();

JobContext context = new JobContext(job, jobId);

/*
* Check the output specification
*
* 根据是否使用New API验证OutputFormat
*
* 如输出格式设置(未设置默认为TextOutputFormat)、是否设置输出路径及输出路径是否已经存在
*
* **********************************************************************
*/
if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
.newInstance(context.getOutputFormatClass(), job);

output.checkOutputSpecs(context);
} else {
job.getOutputFormat().checkOutputSpecs(fs, job);
}

/*
* Create the splits for the job
*
* *******************************************************************
*/
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

/*
* 根据输入切片的数目决定map任务的数目
*
* 一个输入切片对应一个map
*
* *******************************************************************
*/
int maps;

if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}

job.set("mapred.job.split.file", submitSplitFile.toString());

job.setNumMapTasks(maps);

/*
* Write job file to JobTracker's fs
*
* **********************************************************************
*/
FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));

try {
job.writeXml(out);
} finally {
out.close();
}

/*
* Now, actually submit the job (using the submit name)
*
* **********************************************************************
*/
JobStatus status = jobSubmitClient.submitJob(jobId);

if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}
}


下面对该方法内部的执行流程进行详细分析:

(1)生成Job ID

JobID jobId = jobSubmitClient.getNewJobId();


(2)目录相关及文件

在submitJobDir目录下有三个文件:job.jar、job.split、job.xml,其中

job.jar:Job相关类(资源)的一个Jar包;

job.split:Job的输入文件(可能有多个或可以是其它格式(如HBase HTable))会根据一定的条件进行切片,每一个切片中的“数据”会对应的Job的一个Map任务,即每一个Map仅处理某一个切片中的“数据”;

job.xml:用以保存Job相关的配置数据。

/*
* 在submitJobDir目录下有三个文件:job.jar、job.split、job.xml
*
* **********************************************************************
*/
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");

/*
* 根据命令行参数-libjars, -files, -archives对Job进行相应的配置
*/
configureCommandLineOptions(job, submitJobDir, submitJarFile);

Path submitJobFile = new Path(submitJobDir, "job.xml");


其中,configureCommandLineOptions主要是根据用户在命令行环境下提供的参数(-libjars、-files、-archives)进行DistributedCache的设置,并将相应的Jar拷贝至目录submitJobDir中。

注:DistributedCache的相关知识会在后续分析,在此先不进行讨论。

(3)Reducer数目

获取用户所设置的该Job中Reducer的数目。

int reduces = job.getNumReduceTasks();


(4)JobContext

JobContext context = new JobContext(job, jobId);


其实JobContext就是对JobConf与JobID的封装。

(5)Job输出相关校验

/*
* Check the output specification
*
* 根据是否使用New API验证OutputFormat
*
* 如输出格式设置(未设置默认为TextOutputFormat)、是否设置输出路径及输出路径是否已经存在
*
* **********************************************************************
*/
if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
.newInstance(context.getOutputFormatClass(), job);

output.checkOutputSpecs(context);
} else {
job.getOutputFormat().checkOutputSpecs(fs, job);
}


校验时会根据是否使用新版本的API分为两种情况,默认情况下使用的都是新版本的API,所以此处不考虑旧版本API的情况,所以分析的代码变为

 

org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
.newInstance(context.getOutputFormatClass(), job);

output.checkOutputSpecs(context);


首先,获取输出的具体格式;

context.getOutputFormatClass()


 

/**
* Get the {@link OutputFormat} class for the job.
*
* @return the {@link OutputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
throws ClassNotFoundException {
return (Class<? extends OutputFormat<?, ?>>) conf.getClass(
OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
}


由上述代码可以看出,如果用户并没有明确指定输出格式类型,则默认使用TextOutputFormat。

注:文本是进行数据分析时经常使用的一种格式,因此本文主要使用TextInputFormat、TextOutputFormat进行讲解。

然后,通过反射将输出格式实例化;

org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
.newInstance(context.getOutputFormatClass(), job);


最后,通过输出格式的具体类型进行校验,包括两个部分:是否设置输出目录及输出目录是否已经存在。

TextOutputFormat的checkOutputSpecs继承自它的父类FileOutputFormat。

public void checkOutputSpecs(JobContext job)
throws FileAlreadyExistsException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);

if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}

if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
}


(6)生成输入切片(Split),并设置Map的数目;

/*
* Create the splits for the job
*
* *******************************************************************
*/
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

/*
* 根据输入切片的数目决定map任务的数目
*
* 一个输入切片对应一个map
*
* *******************************************************************
*/
int maps;

if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}

job.set("mapred.job.split.file", submitSplitFile.toString());

job.setNumMapTasks(maps);


这里仅分析新版本API下的writeNewSplits,该方法需要两个参数:JobContext及切片文件的Path。

@SuppressWarnings("unchecked")
private <T extends org.apache.hadoop.mapreduce.InputSplit> int writeNewSplits(
JobContext job, Path submitSplitFile) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf conf = job.getJobConf();

/*
* 创建InputFormat实例
*
* 不同的InputFormat实例获取Split的方式不同
*
* ******************************************************************
*/
org.apache.hadoop.mapreduce.InputFormat<?, ?> input = ReflectionUtils
.newInstance(job.getInputFormatClass(), job.getJobConf());

/*
* 获取输入文件对应的切片记录
*
* ******************************************************************
*/
List<org.apache.hadoop.mapreduce.InputSplit> splits = input
.getSplits(job);

T[] array = (T[]) splits
.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits
.size()]);

/*
* sort the splits into order based on size, so that the biggest go
* first
*
* ******************************************************************
*/
Arrays.sort(array, new NewSplitComparator());

/*
* 写出SplitFile
*
* ******************************************************************
*/

// 打开切片文件输出流,并写出头信息(头、版本号、切片数目)
DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
array.length);

try {
if (array.length != 0) {
DataOutputBuffer buffer = new DataOutputBuffer();

RawSplit rawSplit = new RawSplit();

SerializationFactory factory = new SerializationFactory(conf);

Serializer<T> serializer = factory
.getSerializer((Class<T>) array[0].getClass());

serializer.open(buffer);

for (T split : array) {
rawSplit.setClassName(split.getClass().getName());

buffer.reset();

// 序列化文件名、起始位置、切片长度、主机位置(多个)
serializer.serialize(split);

rawSplit.setDataLength(split.getLength());

rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());

rawSplit.setLocations(split.getLocations());

rawSplit.write(out);
}

serializer.close();
}
} finally {
out.close();
}

return array.length;
}


方法思路:根据指定的输入格式类型(InputFormat)对输入文件进行切片,并将切片信息保存至指定的切片文件中。

注:切片并不是对输入文件进行物理上的切割,而只是一种逻辑上的“分割”,即将输入文件某个片段的起始位置保存下来,后期Map任务运行时根据切片文件就可以将该片段作为输入进行处理。

首先,获取输入格式类型,

job.getInputFormatClass()


 

/**
* Get the {@link InputFormat} class for the job.
*
* @return the {@link InputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?, ?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?, ?>>) conf.getClass(
INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}


与输出格式类型相同,如果用户没有特殊指定,默认的输入格式类型为TextInputFormat,然后将此输入格式类型实例化。

org.apache.hadoop.mapreduce.InputFormat<?, ?> input = ReflectionUtils
.newInstance(job.getInputFormatClass(), job.getJobConf());


然后,根据具体的输入格式类型计算切片信息,

/*
* 获取输入文件对应的切片记录
*
* ******************************************************************
*/
List<org.apache.hadoop.mapreduce.InputSplit> splits = input
.getSplits(job);


TextInputFormat的方法getSplits继承自它的父类FileInputFormat。

/**
* Generate the list of files and make them into FileSplits.
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
/*
* 计算Split的最小值与最大值
*
* ********************************************************************
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();

/*
* 逐个处理InputPaths中的文件
*
* *******************************************************************
*/
for (FileStatus file : listStatus(job)) {
Path path = file.getPath();

FileSystem fs = path.getFileSystem(job.getConfiguration());

/*
* 获取特定文件的长度
*
* ******************************************************************
*/
long length = file.getLen();

/*
* 获取特定文件对应的块Block信息
*
* ***************************************************************
*/
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);

/*
* 如果文件长度大于0且是可切片的
*
* ***************************************************************
*/
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();

/*
* 根据blockSize、minSize、maxSize计算切片大小
*
* Math.max(minSize, Math.min(maxSize, blockSize)
*
* ***********************************************************
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;

while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
/*
* 返回的Block Index为此切片开始位置所在Block的Index
*
* **********************************************************
*/
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);

/*
* 一个Block对应一个FileSplit
*
* *******************************************************
*/
splits.add(new FileSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));

bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
/*
* 剩余的文件数据形成一个切片,hosts为此文件最后一个Block的hosts
*
* **********************************************************
*/
splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkLocations.length - 1].getHosts()));
}
} else if (length != 0) {
/*
* 文件长度不为0但不可分割
*
* 不能切片的文件,整体形成一个切片,hosts为此文件第一个Block的hosts
*
* ***********************************************************
*/
splits.add(new FileSplit(path, 0, length, blkLocations[0]
.getHosts()));
} else {
// Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}

LOG.debug("Total # of splits: " + splits.size());

return splits;
}


getSplits处理流程如下:

① 根据配置参数计算Split所允许的最小值与最大值,为后期确定Split的长度提供参考;

/*
* 计算Split的最小值与最大值
*
* ********************************************************************
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);


② 在内存中创建相应的数据结构,用以保存计算所得的切片信息;

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();


③ 循环处理InputPaths所添加的文件,对一个文件各自计算其对应的切片信息;

/*
* 逐个处理InputPaths中的文件
*
* *******************************************************************
*/
for (FileStatus file : listStatus(job)) {
......
}


④ 计算某个文件的切片信息:

a. 获取该文件的长度及对应的Block信息;

Path path = file.getPath();

FileSystem fs = path.getFileSystem(job.getConfiguration());

/*
* 获取特定文件的长度
*
* ******************************************************************
*/
long length = file.getLen();

/*
* 获取特定文件对应的块Block信息
*
* ***************************************************************
*/
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);


b. 根据文件长度以及该文件是否可以切片,分为三种情况处理:

其中,文件是否支持Split,是由该文件类型对应的InputFormat决定的,FileInputFormat中的实现如下:

/**
* Is the given filename splitable? Usually, true, but if the file is stream
* compressed, it will not be.
*
* <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never
* split-up so that {@link Mapper}s process entire files.
*
* @param context
*            the job context
* @param filename
*            the file name to check
* @return is this file splitable?
*/
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}


TextInputFormat中重写了该方法:

@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec = new CompressionCodecFactory(
context.getConfiguration()).getCodec(file);

return codec == null;
}


由上述代码可见,如果文本文件经过相应的压缩之后,是不支持进行Split的。

第一种情况:文件长度大于0,且文件支持Split;

首先计算一个切片的具体长度,长度的计算方式为:Math.max(minSize, Math.min(maxSize, blockSize) ;

long blockSize = file.getBlockSize();

/*
* 根据blockSize、minSize、maxSize计算切片大小
*
* Math.max(minSize, Math.min(maxSize, blockSize)
*
* ***********************************************************
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);


然后,根据splitSize进行切片,思路就是从文件开始处,以splitSize为区间,对文件进行逻辑上的切分;

long bytesRemaining = length;

while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
/*
* 返回的Block Index为此切片开始位置所在Block的Index
*
* **********************************************************
*/
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);

/*
* 一个Block对应一个FileSplit
*
* *******************************************************
*/
splits.add(new FileSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts()));

bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
/*
* 剩余的文件数据形成一个切片,hosts为此文件最后一个Block的hosts
*
* **********************************************************
*/
splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkLocations.length - 1].getHosts()));
}


为了不产生过小的切片,要求尚未进行切片的文件部分长度(bytesRemaining)大于切片长度(splitSize)的SPLIT_SLOP(1.1)倍,然后将文件的剩余部分直接作为一个切片。

在上述代码中的切片信息中,还保存着切片对应的Block信息,注意切片并不一定会与Block完全吻合(即切片在文件中的起止处与该Block在文件中的起止处一致),所谓的对应的Block,是指该切片的起始处正在落在该Block的区间内;之所以要保存切片对应的Block信息,是为后期Map任务的“本地计算”调度运行作准备的。

第二种情况:文件长度大于0,但该文件不支持切片;

/*
* 文件长度不为0但不可分割
*
* 不能切片的文件,整体形成一个切片,hosts为此文件第一个Block的hosts
*
* ***********************************************************
*/
splits.add(new FileSplit(path, 0, length, blkLocations[0]
.getHosts()));


因为该文件不支持切片,直接将整个文件作为一个切片就可以了。

第三种情况:文件长度为0;

// Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));


此时直接创建一个空的切片即可。

到此,所有输入文件的切片信息就全部产生完毕了。

⑤ 对产生的切片进行排序处理,排序的依据是切片的大小,切片越大,在切片集合中的位置应该更靠前,这样可以使大的切片在调度时,优先得到处理。

T[] array = (T[]) splits
.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits
.size()]);

/*
* sort the splits into order based on size, so that the biggest go
* first
*
* ******************************************************************
*/
Arrays.sort(array, new NewSplitComparator());


⑥ 存储切片信息至相应的切片文件中,调度任务时使用切片文件中的信息进行调度,具体的存储过程不影响整个处理流程的理解,在此不对它进行分析。

至此,writeNewSplits方法结果,该方法还回返回切片的总数目,即对应着Job的Map任务数目。

(7)将Job的相关信息写入job.xml;

/*
* Write job file to JobTracker's fs
*
* **********************************************************************
*/
FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));

try {
job.writeXml(out);
} finally {
out.close();
}


(8)完成Job任务的实际提交;

/*
* Now, actually submit the job (using the submit name)
*
* **********************************************************************
*/
JobStatus status = jobSubmitClient.submitJob(jobId);

if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}


到此,Job任务的提交过程分析完毕。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: