hadoop2提交到Yarn: Mapreduce执行过程reduce分析3
2017-03-15 22:02
627 查看
原文
问题导读:
1.Reduce类主要有哪三个步骤?
2.Reduce的Copy都包含什么过程?
3.Sort主要做了哪些工作?
4.4 Reduce类4.4.1 Reduce介绍
[align=left]整完了Map,接下来就是Reduce了。YarnChild.main()—>ReduceTask.run()。ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),runTaskCleanupTask()等。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。[/align]
4.4.2 Copy
Copy就是从执行各个Map任务的节点获取map的输出文件。这是由ReduceTask.ReduceCopier 类来负责。ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器。如果大小超过一定阈值就写到磁盘,否则放入内存,在远程拷贝数据的同时,Reduce
Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多。
[align=left]Step1:[/align]
[align=left] 首先在ReduceTask的run方法中,通过如下配置来mapreduce.job.reduce.shuffle.consumer.plugin.class装配shuffle的plugin。默认的实现是Shuffle类:[/align]
Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
复制代码
[align=left]Step2:[/align]
[align=left] 初始化上述的plugin后,执行其run方法,得到RawKeyValueIterator的实例。[/align]
[align=left]run方法的执行步骤如下:[/align]
[align=left]Step2.1:[/align]
[align=left] 量化Reduce的事件数目:[/align]
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
复制代码
[align=left]Step2.2:[/align]
[align=left]生成map的完成状态获取线程,并启动此线程:[/align]
final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch);
eventFetcher.start();
复制代码
获取已经完成的Map信息,如Map的host、mapId等放入ShuffleSchedulerImpl中的Set<MapHost>中便于下面进行数据的拷贝传输。
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
复制代码
[align=left]Step2.3:[/align]
[align=left] 在Shuffle类中启动初始化Fetcher线程组,并启动:[/align]
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
复制代码
线程的run方法就是进行数据的远程拷贝:
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
复制代码
[align=left]Step2.4:[/align]
[align=left]来看下这个copyFromHost方法。主要是就是使用HttpURLConnection,实现远程数据的传输。[/align]
[align=left]建立连接之后,从接收到的Stream流中读取数据。每次读取一个map文件。[/align]
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
failedTasks = copyMapOutput(host, input, remaining);
}
复制代码
[align=left]上面的copyMapOutput方法中,每次读取一个mapid,根据MergeManagerImpl中的reserve函数,检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小,此配置的默认值[/align]
[align=left]是当前Runtime的maxMemory*mapreduce.reduce.shuffle.input.buffer.percent配置的值,Buffer.percent的默认值为0.90。[/align]
[align=left]如果mapoutput超过了此配置的大小时,生成一个OnDiskMapOutput实例。在接下来的操作中,map的输出写入到local临时文件中。[/align]
[align=left]如果没有超过此大小,生成一个InMemoryMapOutput实例。在接下来操作中,直接把map输出写入到内存。[/align]
[align=left]最后,执行ShuffleScheduler.copySucceeded完成文件的copy,调用mapout.commit函数,更新状态或者触发merge操作。[/align]
[align=left]Step2.5:[/align]
[align=left] 等待上面所有的拷贝完成之后,关闭相关的线程。[/align]
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
复制代码
[align=left]Step2.6:[/align]
[align=left]执行最终的merge操作,由Shuffle中的MergeManager完成:[/align]
public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}
复制代码
[align=left]Step3:[/align]
[align=left]释放资源。[/align]
mapOutputFilesOnDisk.clear();
复制代码
[align=left] Copy完毕。[/align]
4.4.3 Sort
[align=left] Sort(其实相当于合并)就相当于排序工作的一个延续,它会在所有的文件都拷贝完毕后进行。使用工具类Merger归并所有的文件。经过此过程后,会产生一个合并了所有(所有并不准确)Map任务输出文件的新文件,而那些从其他各个服务器搞过来的 Map任务输出文件会删除。根据hadoop是否分布式来决定调用哪种排序方式。[/align]
[align=left] 在上面的4.3.2节中的Step2.4结束之后就会触发此操作。[/align]
4.4.4 Reduce
[align=left] 经过上面的步骤之后,回到ReduceTask中的run方法继续往下执行,调用runNewReducer。创建reducer:[/align]
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
复制代码
并执行其run方法,此run方法就是我们的org.apache.hadoop.mapreduce.Reducer中的run方法。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
复制代码
while的循环条件是ReduceContext.nextKey()为真,这个方法就在ReduceContext中实现的,这个方法的目的就是处理下一个唯一的key,因为reduce方法的输入数据是分组的,所以每次都会处理一个key及这个key对应的所有value,又因为已经将所有的Map
Task的输出拷贝过来而且做了排序,所以key相同的KV对都是挨着的。
[align=left] nextKey方法中,又会调用nextKeyValue方法来尝试去获取下一个key值,并且如果没数据了就会返回false,如果还有数据就返回true。防止获取重复的数据就在这里做的处理。[/align]
[align=left]接下来就是调用用户自定义的reduce方法了。[/align]
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
复制代码
问题导读:
1.Reduce类主要有哪三个步骤?
2.Reduce的Copy都包含什么过程?
3.Sort主要做了哪些工作?
4.4 Reduce类4.4.1 Reduce介绍
[align=left]整完了Map,接下来就是Reduce了。YarnChild.main()—>ReduceTask.run()。ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),runTaskCleanupTask()等。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。[/align]
4.4.2 Copy
Copy就是从执行各个Map任务的节点获取map的输出文件。这是由ReduceTask.ReduceCopier 类来负责。ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器。如果大小超过一定阈值就写到磁盘,否则放入内存,在远程拷贝数据的同时,Reduce
Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多。
[align=left]Step1:[/align]
[align=left] 首先在ReduceTask的run方法中,通过如下配置来mapreduce.job.reduce.shuffle.consumer.plugin.class装配shuffle的plugin。默认的实现是Shuffle类:[/align]
Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
复制代码
[align=left]Step2:[/align]
[align=left] 初始化上述的plugin后,执行其run方法,得到RawKeyValueIterator的实例。[/align]
[align=left]run方法的执行步骤如下:[/align]
[align=left]Step2.1:[/align]
[align=left] 量化Reduce的事件数目:[/align]
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
复制代码
[align=left]Step2.2:[/align]
[align=left]生成map的完成状态获取线程,并启动此线程:[/align]
final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch);
eventFetcher.start();
复制代码
获取已经完成的Map信息,如Map的host、mapId等放入ShuffleSchedulerImpl中的Set<MapHost>中便于下面进行数据的拷贝传输。
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
复制代码
[align=left]Step2.3:[/align]
[align=left] 在Shuffle类中启动初始化Fetcher线程组,并启动:[/align]
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
复制代码
线程的run方法就是进行数据的远程拷贝:
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
复制代码
[align=left]Step2.4:[/align]
[align=left]来看下这个copyFromHost方法。主要是就是使用HttpURLConnection,实现远程数据的传输。[/align]
[align=left]建立连接之后,从接收到的Stream流中读取数据。每次读取一个map文件。[/align]
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
failedTasks = copyMapOutput(host, input, remaining);
}
复制代码
[align=left]上面的copyMapOutput方法中,每次读取一个mapid,根据MergeManagerImpl中的reserve函数,检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小,此配置的默认值[/align]
[align=left]是当前Runtime的maxMemory*mapreduce.reduce.shuffle.input.buffer.percent配置的值,Buffer.percent的默认值为0.90。[/align]
[align=left]如果mapoutput超过了此配置的大小时,生成一个OnDiskMapOutput实例。在接下来的操作中,map的输出写入到local临时文件中。[/align]
[align=left]如果没有超过此大小,生成一个InMemoryMapOutput实例。在接下来操作中,直接把map输出写入到内存。[/align]
[align=left]最后,执行ShuffleScheduler.copySucceeded完成文件的copy,调用mapout.commit函数,更新状态或者触发merge操作。[/align]
[align=left]Step2.5:[/align]
[align=left] 等待上面所有的拷贝完成之后,关闭相关的线程。[/align]
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
复制代码
[align=left]Step2.6:[/align]
[align=left]执行最终的merge操作,由Shuffle中的MergeManager完成:[/align]
public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}
复制代码
[align=left]Step3:[/align]
[align=left]释放资源。[/align]
mapOutputFilesOnDisk.clear();
复制代码
[align=left] Copy完毕。[/align]
4.4.3 Sort
[align=left] Sort(其实相当于合并)就相当于排序工作的一个延续,它会在所有的文件都拷贝完毕后进行。使用工具类Merger归并所有的文件。经过此过程后,会产生一个合并了所有(所有并不准确)Map任务输出文件的新文件,而那些从其他各个服务器搞过来的 Map任务输出文件会删除。根据hadoop是否分布式来决定调用哪种排序方式。[/align]
[align=left] 在上面的4.3.2节中的Step2.4结束之后就会触发此操作。[/align]
4.4.4 Reduce
[align=left] 经过上面的步骤之后,回到ReduceTask中的run方法继续往下执行,调用runNewReducer。创建reducer:[/align]
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
复制代码
并执行其run方法,此run方法就是我们的org.apache.hadoop.mapreduce.Reducer中的run方法。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
复制代码
while的循环条件是ReduceContext.nextKey()为真,这个方法就在ReduceContext中实现的,这个方法的目的就是处理下一个唯一的key,因为reduce方法的输入数据是分组的,所以每次都会处理一个key及这个key对应的所有value,又因为已经将所有的Map
Task的输出拷贝过来而且做了排序,所以key相同的KV对都是挨着的。
[align=left] nextKey方法中,又会调用nextKeyValue方法来尝试去获取下一个key值,并且如果没数据了就会返回false,如果还有数据就返回true。防止获取重复的数据就在这里做的处理。[/align]
[align=left]接下来就是调用用户自定义的reduce方法了。[/align]
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
复制代码
相关文章推荐
- hadoop2提交到Yarn: Mapreduce执行过程reduce分析3
- hadoop2提交到Yarn: Mapreduce执行过程reduce分析3
- hadoop2提交到Yarn: Mapreduce执行过程分析
- hadoop2提交到Yarn: Mapreduce执行过程分析1
- hadoop2提交到Yarn: Mapreduce执行过程分析
- hadoop2提交到Yarn: Mapreduce执行过程分析1
- hadoop2提交到Yarn: Mapreduce执行过程分析2
- hadoop2提交到Yarn: Mapreduce执行过程分析1
- hadoop2提交到Yarn: Mapreduce执行过程分析2
- hadoop2提交到Yarn: Mapreduce执行过程分析
- hadoop2提交到Yarn: Mapreduce执行过程分析2
- Hadoop v2(Yarn) 调度分析 (1) JobClient 的提交过程
- hadoop执行mapreduce过程reduce不执行原因
- Mapreduce执行过程分析(基于Hadoop2.4)——(三)
- MapReduce执行过程源码分析(一)——Job任务的提交
- Mapreduce执行过程分析(基于Hadoop2.4)——(一)
- 【hadoop2.2(yarn)】基于yarn成功执行分布式map-reduce,记录问题解决过程。
- hadoop执行mapreduce过程reduce不执行原因
- Hadoop MapReduce之ReduceTask任务执行(三):Merger线程分析
- Mapreduce执行过程分析(基于Hadoop2.4)——(三)