您的位置:首页 > 运维架构

随手记点-mapreduce5

2016-03-23 00:21 471 查看

1.Reduce Task阶段包括几个阶段?各个阶段又负责怎样的工作?

答:Reduce Task包括了copy phase,sort phase和reduce phase。copy阶段将所有分散在其他机器上的map处理过的数据收集过来;sort阶段将这么多的输入文件组织成KV对;reducer阶段负责处理数据。

下面是reducer运行的代码:

//在类初始化的时候,
setPhase(TaskStatus.Phase.SHUFFLE);

//之后运行的run函数
public void run(...){
//copyPhase
copyPhase.complete();// copy is already complete//sortPhase
sortPhase.complete();// sort is complete
runNewReducer(...);
}
void runNewReducer(... ) {
//初始化输入、reducer、输出对象
reducer.run(reducerContext);
}


2.reduce的输入又是从何而来?

答:简单而言,reduce的输入来源于map的输出。这样说其实过于简单了,忽略了许多具体实现的细节。说的详细点,reduce的数据输入是通过驻在reducer节点上的HTTP server通过HTTP来从不同的spill文件中获取自己的map输出结果,在把获取的map结果放在内存或本节点下的磁盘下。reducer的这个阶段也就是Copy Phase,下面会进行详细的分析。

3.Copy Phase阶段是怎样工作的?

答:首先判断是不是在local执行模式下,如果不是local执行的模式,则需要使用ReduceCopier从其他机器上将map处理完的数据复制过来。fetchOutputs方法是复制数据的主要实现。

//ReduceTask中的run方法部分代码
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job, reporter);
if (!reduceCopier.fetchOutputs()) {
if(reduceCopier.mergeThrowable instanceof FSError) {
throw (FSError)reduceCopier.mergeThrowable;
}
throw new IOException("Task: " + getTaskID() +" - The reduce copier failed", reduceCopier.mergeThrowable);
}
}


在copy阶段,Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。Reduce任务拖取某个Map对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K。拖一个Map数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。有些Map的数据较小是可以放在内存中的,有些Map的数据较大需要放在磁盘上,这样最后Reduce任务拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并。

4.Sort Phase阶段是怎样工作的?

答:因为从map端拿到的数据都是文件,需要在sort阶段将这些文件中的数据封装成kv。下面是具体的实现:

//ReduceTask中的run方法
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null)
: reduceCopier.createKVIterator(job, rfs, reporter);

// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();


5.Reduce Phase阶段是怎样工作的?

答:利用setup()的方法做了一些配置,默认是空;再不断的读取下一个(K,List),并交给reduce一个一个KV来处理。默认的reduce方法就是什么都不做,输入和输出时一样的;getCurrentKey()和getCurrentValue()是对输入的封装;write方法是对输出对象的封装。下面显示的reduce阶段的核心代码:

public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}
//默认的reduce方法
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}


6.reduce也只能处理一个键值对,那么框架中是如何不断的使用reduce来处理多个键值对的?

答:从上面的代码可以看出有一个while(context.nextKey())的循环来不断的读取下一个(k, List),并不断地交给reduce一个kv一个kv来处理。

7.copy phase具体由几个方面构成?各个方面的作用是什么?

答:copy phase主要包括了fetchOutputs,GetMapEventThread,MapOutputCopier,InMemFSMergeThread,LocalFSMerge。其中



fetchOutputs

由mainThread控制,主要负责管理整个获取数据的流程;

在开始时启动线程,完成后变释放资源;

将已获取数据的记录删除,并同时添加新的完成了的MapTask。

GetMapEventThread

负责从Task Tracke中不断获取MapTask的完成情况,并将已完成的MapTask放到自己的记录中。

MapOutputCopier

负责从已完成的MapTask中获取数据,大文件保存在硬盘中,小文件保存在内存中。

InMemFSMergeThread

负责在缓冲区达到一定阈值时,将内
4000
存中的数据写入硬盘以减少内存的占用。

LocalFSMerger

负责在硬盘上的merge过程,负责在文件较多的情况下将硬盘中的文件进行merge以减少文件 的个数。

8.sort phase需考虑几个方面的影响?又是怎么做到排序和封成KV对的?

答:因为在copy phase过程中会根据mapout文件的大小将文件分别放在内存和硬盘上。在这两种不同的介质上显然不是有序的,那么在sort phase就必须解决同时处理不同的介质并且有序的问题。所以sort phase采用了MergeQueue进行边排序变输出,此时仍需满足磁盘上的文件个数小于io.sort.factors设定的10,超过10个则进行merge操作。并且内存中的数据年小于最大可用内存的mapred.job.reduce.input.buffer.percent。由于有些文件在硬盘上,利用RawKVIteratorReader将硬盘中的文件封装成KV形式才能让reduce来读取。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop mapreduce