您的位置:首页 > 其它

MapReduce的MapTask任务的运行源码级分析

2014-06-18 00:07 357 查看
  TaskTracker任务初始化及启动task源码级分析 这篇文章中分析了任务的启动,每个task都会使用一个进程占用一个JVM来执行,org.apache.hadoop.mapred.Child方法是具体的JVM启动类,其main方法中的taskFinal.run(job, umbilical)会启动具体的Task。

  Task分为两种类型:MapTask和ReduceTask,很明显,前者对应于Map任务,后者对应于Reduce任务。且MapTask分为4种:Job-setup Task、Job-cleanup Task、Task-cleanup Task和 Map Task。Job-setup Task、Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录;Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务;最后一种Map Task则是处理数据并将结果存到本地磁盘上。

  本节先看MapTask,Child类调用run()方法,此类任务的run()方法代码如下:  

@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
//负责与TaskTracker的通信,通过该对象可以获得必要的对象
this.umbilical = umbilical;

// start thread that will handle communication with parent
// 启动Reporter线程,用来和TaskTracker交互目前运行的状态
TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
jvmContext);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewMapper();
/*用来初始化任务,主要是进行一些和任务输出相关的设置,比如创 建commiter,设置工作目录等*/
initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask
/*以下4个if语句均是根据任务类型的不同进行相应的操作,这些方 法均是Task类的方法,所以与任务是MapTask还是ReduceTask无关*/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
//主要是创建工作目录的FileSystem对象
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
//设置任务目前所处的阶段为结束阶段,并且删除工作目录
runTaskCleanupTask(umbilical, reporter);
return;
}
//如果不是上述四种类型,则真正运行任务
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);//等待JobTracker的commit命令
}


  (1)参数TaskUmbilicalProtocol,这个协议用于Child和TaskTracker之间的通信。Child通过此协议,查看TaskTracker是否存在,取得任务,报告任务的进度,状态,出错信息,Commit文件到HDFS,并取得map结果给reduce;TaskTracker接收任务并监控任务的进度。

  (2)TaskReporter类是是Task类的内部私有类。Task.TaskReporter用于向TaskTracker提交计数器报告和状态报告,它实现了计数器报告Reporter和状态报告StatusReporter。为了不影响主线程的工作,TaskReporter有一个独立的线程,该线程通过TaskUmbilicalProtocol接口,向TaskTracker报告Task执行情况。startCommunicationThread()方法会启动线程。

  (3)useNewApi = job.getUseNewMapper()获取这个Task使用的新的API还是旧的API。mapreduce job提交流程源码级分析(一)(原创)这篇文章有讲在Job提交的时候就设置了使用新的API(包括新的Mapper和新的Reducer)。

  (4)initialize(job, getJobID(), reporter, useNewApi)该方法在父类Task中。这个方法会将Task的状态设置为RUNNING,表示正在运行;然后如果是新API会获取对应的OutputFormatClass默认是TextOutputFormat.class,新API会获取mapreduce.FileOutputCommitter旧API会获取mapred.FileOutputCommitter;再获取在MapReduce程序中通过FileOutputFormat.setOutputPath设置的输出目录,如果这个输出目录不为null且是新的API会执行else语句FileOutputFormat.setWorkOutputPath(conf, outputPath)(这个是旧版mapred.FileOutputFormat)设置工作目录,比如hdfs://IP:8020/user/XXX,IP指的是namenode,XXX指的是用户名;然后构造一个资源计算器ResourceCalculatorPlugin对象,来获取内存、CPU等资源信息。

  (5)如果jobCleanup==true(是在TaskInProgress类中设置的)表明这个task是清理Job的。直接运行runJobCleanupTask(umbilical, reporter)方法,这个方法是清理Job,包括步骤状态设置,更新状态到TaskTracker,调用org.apache.hadoop.mapreduce.OutputCommitter的相关方法,删除目录,通过done,通知TaskTracker任务完成等待commit命令。

  (6)如果jobSetup==true(是在TaskInProgress类中设置的)表明要初始化Job,直接运行runJobSetupTask(umbilical, reporter)为建立Job做准备,执行状态设置,然后调用org.apache.hadoop.mapreduce.OutputCommitter的setupJob,最后通过done,通知TaskTracker任务完成等待commit命令。

  (7)如果是taskCleanup==true(是在TaskInProgress类中设置的)表明是清理task的任务,直接运行runTaskCleanupTask(umbilical, reporter),清理Task任务,和上面(5)中runJobCleanupTask类似。

  (8)接下来才是执行Mapper的步骤,如果不是上面的5,6,7三种,如果是启用新的API(实际上是启用的,我们也只分析新API),就执行runNewMapper(job, splitMetaInfo, umbilical, reporter)方法。

  (9)done(umbilical, reporter)这个方法也被上面的5,6,7调用了,这个方法用于做结束任务的一些清理工作:更新计数器updateCounters();如果任务需要提交,设置Taks状态为COMMIT_PENDING,并利用TaskUmbilicalProtocol,汇报Task完成,等待提交,然后调用commit提交任务;设置任务结束标志位;结束Reporter通信线程;发送最后一次统计报告(通过sendLastUpdate方法);利用TaskUmbilicalProtocol报告结束状态(通过sendDone方法)。

  下面我们来看(8)中的runNewMapper(job, splitMetaInfo, umbilical, reporter)方法方法,这个方法将会构造一系列的对象来辅助执行Mapper。其代码如下:

View Code
   先计算写入文件的大小;然后获取写到本地(非HDFS)文件的文件名,会有一个编号,例如output/spill2.out;然后构造一个输出流;然后使用快排对缓冲区kvbuffe中区间[bufstart,bufend)内的数据进行排序,先按分区编号partition进行排序,然后按照key进行排序。这样经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

  会构建一个IFile.Writer对象将输出流传进去,输出到指定的文件当中,这个对象支持行级的压缩。如果用户设置了Combiner(实际上是一个reducer),则写入文件之前会对每个分区中的数据进行一次聚集操作,通过combinerRunner.combine(kvIter, combineCollector)实现,因为使用了新版的API,所以combinerRunner会是NewCombinerRunner,它的combine方法会执行reducer.run方法,只不过输出和正常的reducer不一样而已,这里最终会调用IFile.Writer的append方法实现本地文件的写入。

  还有将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果内存中索引大于1MB,则写到文件output/spill2.out.index中。
  runNewMapper方法的最后会有输出流的关闭:output.close(mapperContext),其实就是NewOutputCollector.close(mapperContext)该方法会执行MapOutputBuffer.flush()操作会将剩余的数据也通过sortAndSpill()方法写入本地文件,并在最后调用mergeParts()方法合并所有spill文件。代码如下:  

private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
final TaskAttemptID mapId = getTaskID();

for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);    //通过spill文件的编号获取到指定的spill文件路径
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
//合并输出有俩文件一个是output/file.out,一个是output/file.out.index
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {    //写入文件
indexCacheList.get(0).writeToFile(
new Path(filename[0].getParent(),"file.out.index"), job);
}
return;
}

// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job, null));
}

//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);   //output/file.out
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);    //output/file.out.index

//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

if (numSpills == 0) {
//create dummy(假的,假设) files
IndexRecord rec = new IndexRecord();
SpillRecord sr = new SpillRecord(partitions);
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
writer.close();
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
} finally {
finalOut.close();
}
return;
}
{
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
//finalOut最终输出文件。循环分区获得所有spill文件的该分区数据,合并写入finalOut
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
for(int i = 0; i < numSpills; i++) {
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

Segment<K,V> s =
new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
indexRecord.partLength, codec, true);
segmentList.add(i, s);

if (LOG.isDebugEnabled()) {
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + indexRecord.startOffset + "," +
indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}

//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, job.getInt("io.sort.factor", 100),//做merge操作时同时操作的stream数上限
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter,
null, spilledRecordsCounter);

//write merged output to disk
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
// numSpills 为mapTask已经溢写到磁盘spill文件数量
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
//其实写入数据的还是这里的writer类的append方法,这的输出是output/file.out文件,是合并后的文件
combinerRunner.combine(kvIter, combineCollector);
}

//close
writer.close();

// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);    //写入索引文件
finalOut.close();        //合并后的输出文件
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}


View Code
  该方法会将所有临时文件合并成一个大文件保存到output/file.out中,同时生成相应的索引文件output/file.out.index。 在进行文件合并的过程中,Map Task以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式:每轮合并io.sort.factor,默认是100,个文件,并将产生的文 件重新加入待合并列表中,对文件排序后,重复上述过程,直到只有一个文件。只生产一个文件可以避免同时打开大量的文件和同时读取大量的小文件产生的随机读 取带来的开销。最后会删除所有的spill文件。

  另外需要注意的是,mergeParts()中也有combiner的操作,但是需要满足一定的条件:1、用户设置了combiner;2、spill文件的数量超过了minSpillsForCombine的值,对应配置项"min.num.spills.for.combine",可自行设置,默认是3。这俩必须同时具备才会在此启动combiner的本地聚集操作。所以在Map阶段有可能combiner会执行两次,所以有可能你的combiner执行两次之后输出数据不符合预期了。

  

  这样Map阶段的任务就算完成了。主要是读取数据然后写入内存缓冲区,缓存区满足条件就会快排后并设置partition后,spill到本地文件和索引文件;如果有combiner,spill之前也会做一次聚集操作,待数据跑完会通过归并合并所有spill文件和索引文件,如果有combiner,合并之前在满足条件后会做一次综合的聚集操作。map阶段的结果都会存储在本地中(如果有reducer的话),非HDFS。

  参考:1、董西成,《hadoop技术内幕---深入理解MapReduce架构设计与实现原理》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: