您的位置:首页 > 其它

MapReduce工作机制详解

2018-03-02 23:15 330 查看

一、MapTask工作机制



详细步骤:

 首先,读取数据组件 InputFormat (默认 TextInputFormat)会通过 getSplits
方法对输入目录中文件进行逻辑切片规划得到 splits,有多少个 split 就对
应启动多少个 MapTask。split 与 block 的对应关系默认是一对一。

 将 输 入 文 件 切 分 为 splits 之 后 , 由 RecordReader 对 象 ( 默 认
LineRecordReader)进行读取,以\n 作为分隔符,读取一行数据,返回<key,
value>。Key 表示每行首字符偏移值,value 表示这一行文本内容。

 读取 split 返回<key,value>,进入用户自己继承的 Mapper 类中,执行用户
重写的 map 函数。RecordReader 读取一行这里调用一次。

 map 逻辑完之后,将 map 的每条结果通过 context.write 进行 collect 数据
收集。在 collect 中,会先对其进行分区处理,默认使用 HashPartitioner。
MapReduce 提供 Partitioner 接口,它的作用就是根据 key 或 value 及 reduce 的数量
来决定当前的这对输出数据最终应该交由哪个 reduce task 处理。默认对 key hash 后再以
reduce task 数量取模。默认的取模方式只是为了平均 reduce 的处理能力,如果用户自己
对 Partitioner 有需求,可以订制并设置到 job 上。

 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作
用是批量收集 map 结果,减少磁盘 IO 的影响。我们的 key/value 对以及
Partition 的结果都会被写入缓冲区。当然写入之前,key 与 value 值都会
被序列化成字节数组。
环形缓冲区其实是一个数组,数组中存放着 key、value 的序列化数据和 key、value 的
元数据信息,包括 partition、key 的起始位置、value 的起始位置以及 value 的长度。环
形结构是一个抽象概念。
缓冲区是有大小限制,默认是 100MB。当 map task 的输出结果很多时,就可能会撑爆
内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。
这个从内存往磁盘写数据的过程被称为 Spill,中文可译为溢写。这个溢写是由单独线程来
完成,不影响往缓冲区写 map 结果的线程。溢写线程启动时不应该阻止 map 的结果输出,所
以整个缓冲区有个溢写的比例 spill.percent。这个比例默认是 0.8,也就是当缓冲区的数
据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,
锁定这 80MB 的内存,执行溢写过程。Map task 的输出结果还可以往剩下的 20MB 内存中写,
互不影响。

 当溢写线程启动后,需要对这 80MB 空间内的 key 做排序(Sort)。排序是
MapReduce 模型默认的行为,这里的排序也是对序列化的字节做的排序。
如果 job 设置过 Combiner,那么现在就是使用 Combiner 的时候了。将
有相同 key 的 key/value 对的 value 加起来,减少溢写到磁盘的数据量。
Combiner 会优化 MapReduce 的中间结果,所以它在整个模型中会多次使用。
那哪些场景才能使用 Combiner 呢?从这里分析,Combiner 的输出是 Reducer 的输入,
Combiner 绝不能改变最终的计算结果。Combiner 只应该用于那种 Reduce 的输入 key/value
与输出 key/value 类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner
的使用一定得慎重,如果用好,它对 job 执行效率有帮助,反之会影响 reduce 的最终结果。

 每次溢写会在磁盘上生成一个临时文件(写之前判断是否有 combiner),如
果 map 的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有
多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行
merge 合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了
一个索引文件,以记录每个 reduce 对应数据的偏移量。
至此 map 整个阶段结束。


二、ReducerTask工作机制



详细步骤

 Copy 阶段,简单地拉取数据。Reduce 进程启动一些数据 copy 线程(Fetcher),
通过 HTTP 方式请求 maptask 获取属于自己的文件。

 Merge 阶段。这里的 merge 如 map 端的 merge 动作,只是数组中存放的是不
同 map 端 copy 来的数值。Copy 过来的数据会先放入内存缓冲区中,这里的
缓冲区大小要比 map 端的更为灵活。merge 有三种形式:内存到内存;内存
到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到
达一定阈值,就启动内存到磁盘的 merge。与 map 端类似,这也是溢写的过
程,这个过程中如果你设置有 Combiner,也是会启用的,然后在磁盘中生成
了众多的溢写文件。第二种 merge 方式一直在运行,直到没有 map 端的数据
时才结束,然后启动第三种磁盘到磁盘的 merge 方式生成最终的文件。

  把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

 对排序后的键值对调用 reduce 方法,键相等的键值对调用一次 reduce 方法,
每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到 HDFS
文件中。


三、Shuffle机制

map  阶段处理的数据如何传递给 reduce  阶段,是 MapReduce 框架中最关键
的一个流程,这个流程就叫shuffle。

shuffle: 洗牌、发牌——(核心机制:数据分区,排序,合并)。




shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce
阶段。一般把从 p Map  产生输出开始到 e Reduce  取得数据 作为输入 之前 的过程 称
作 shuffle。

1).Collect 阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,
保存的是 key/value,Partition 分区信息等。

2).Spill 阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写
入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置
了 combiner,还会将有相同分区号和 key 的数据进行排序。

3).Merge 阶段:把所有溢出的临时文件进行一次合并操作,以确保一个
MapTask 最终只产生一个中间数据文件。

4).Copy 阶段: ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点
上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的
缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。

5).Merge 阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线
程对内存到本地的数据文件进行合并操作。

6).Sort 阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask
阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整
体有效性即可。

Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,
缓冲区越大,磁盘 io 的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认 100M


四、MapReduce并行度机制

MapTask 的并行度指的是 map 阶段有多少个并行的 task 共同处理任务。map
阶段的任务处理并行度,势必影响到整个 job 的处理速度。那么,MapTask 并行
实例是否越多越好呢?其并行度又是如何决定呢?

4000
一个 MapReducejob 的 p map  阶段并行度由客户端在提交 b job  时决定,即客户
端提交 job 之前会对待处理数据进行 逻辑切片。切片完成会形成 切片规划 文件
( job.split) ),每个逻辑切片最终对应启动一个 maptask。
逻辑切片机制由 FileInputFormat 实现类的  getSplits()方法完成。


FileInputFormat中默认的切片机制:

A. 简单地按照文件的内容长度进行切片
B. 切片大小,默认等于 block 大小
C. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
经过 FileInputFormat 的切片机制运算后,形成的切片信息如下:
file1.txt.split1—0M~128M
file1.txt.split2—128M~256M
file1.txt.split3—256M~320M
file2.txt.split1—0M~10M


FileInputFormat 中切片的大小的参数配置

在 FileInputFormat 中,计算切片大小的逻辑:
Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定:
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此, 默认情况下, split size =block size,在 hadoop 2.x 中为 128M。
maxsize(切片最大值):参数如果调得比 blocksize 小,则会让切片变小,
而且就等于配置的这个参数的。
minsize (切片最小值):参数调的比 blockSize 大,则可以让切片变得比
blocksize 还大。
但是,不论怎么调参数,都不能让多个小文件“划入”一个  split 。
还有个细节就是:
当 bytesRemaining/splitSize > 1.1 不满足的话,那么最后所有剩余
的会作为一个切片。从而不会形成例如 129M 文件规划成两个切片的局面。


Reducetask并行度机制

reducetask 并行度同样影响整个 job 的执行并发度和执行效率,与 maptask
的并发数由切片数决定不同,Reducetask 数量的决定是可以直接手动设置:
job.setNumReduceTasks(4);
如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜。
注意: reducetask 数量并不是任意设置,还要考虑业务逻辑需求,有些情
况下,需要计算全局汇总结果,就只能有 1 个 reducetask。


Task并行度经验之谈

最好每个 task 的执行时间至少一分钟。
如果 job 的每个 map 或者 reduce task 的运行时间都只有 30-40 秒钟,那
么就减少该 job 的 map 或者 reduce 数,每一个 task(map|reduce)的 setup 和加
入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个
task 都非常快就跑完了,就会在 task 的开始和结束的时候浪费太多的时间。
此外,默认情况下,每一个 task 都是一个新的 JVM 实例,都需要开启和销
毁的开销。在一些情况下,JVM 开启和销毁的时间可能会比实际处理数据的时间
要消耗的长,配置 task 的 M JVM  重用可以改善该问题:
(mapred.job.reuse.jvm.num.tasks,默认是 1,表示一个 JVM 上最多可以
顺序执行的 task 数目(属于同一个 Job)是 1。也就是说一个 task 启一个 JVM)
如果 input 的文件非常的大,比如 1TB,可以考虑将 hdfs 上的每个 block
size 设大,比如设成 256MB 或者 512MB


五、MapReduce优化参数

1.资源相关参数

//以下参数是在用户自己的 MapReduce 应用程序中配置就可以生效
(1) mapreduce.map.memory.mb: 一个 Map Task 可使用的内存上限(单位:MB),默认为 1024。
如果 Map Task 实际使用的资源量超过该值,则会被强制杀死。
(2) mapreduce.reduce.memory.mb: 一个 Reduce Task 可使用的资源上限(单位:MB),默认
为 1024。如果 Reduce Task 实际使用的资源量超过该值,则会被强制杀死。
(3) mapreduce.map.c
pu.vcores: 每个 Maptask 可用的最多 cpu core 数目, 默认值: 1
(4) mapreduce.reduce.cpu.vcores: 每个 Reducetask 可用最多 cpu core 数目默认值: 1
(5) mapreduce.map.java.opts: Map Task 的 JVM 参数,你可以在此配置默认的 java heap
size 等参数, 例如:“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”
(@taskid@会被 Hadoop 框架自动换为相应的 taskid), 默认值: “”
(6) mapreduce.reduce.java.opts: Reduce Task 的 JVM 参数,你可以在此配置默认的 java
heap size 等参数, 例如:“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默
认值: “”

//应该在 yarn 启动之前就配置在服务器的配置文件中才能生效
(1) yarn.scheduler.minimum-allocation-mb RM 中每个容器请求的最小配置,以 MB 为单
位,默认 1024。
(2) yarn.scheduler.maximum-allocation-mb RM 中每个容器请求的最大分配,以 MB 为单
位,默认 8192。
(3) yarn.scheduler.minimum-allocation-vcores  1
(4)yarn.scheduler.maximum-allocation-vcores 32
(5) yarn.nodemanager.resource.memory-mb 表示该节点上YARN可使用的物理内存总量,
默认是 8192(MB),注意,如果你的节点内存资源不够 8GB,则需要调减小这个值,而 YARN
不会智能的探测节点的物理内存总量。

//shuffle 性能优化的关键参数,应在 yarn 启动之前就配置好
(1) mapreduce.task.io.sort.mb 100 shuffle 的环形缓冲区大小,默认 100m
(2) mapreduce.map.sort.spill.percent 0.8 环形缓冲区溢出的阈值,默认 80%


2.容错相关参数

(1) mapreduce.map.maxattempts: 每个 Map Task 最大重试次数,一旦重试参数超过该值,则
认为 Map Task 运行失败,默认值:4。
(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,
则认为 Map Task 运行失败,默认值:4。
(3) mapreduce.map.failures.maxpercent: 当失败的 Map Task 失败比例超过该值,整个作业则
失败,默认值为 0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于 0 的
值,比如 5,表示如果有低于 5%的 Map Task 失败(如果一个 Map Task 重试次数超过
mapreduce.map.maxattempts,则认为这个 Map Task 失败,其对应的输入数据将不会产生任
何结果),整个作业扔认为成功。
(4) mapreduce.reduce.failures.maxpercent: 当失败的 Reduce Task 失败比例超过该值为,整个
作业则失败,默认值为 0.
(5) mapreduce.task.timeout:如果一个task在一定时间内没有任何进入,即不会读取新的数据,
也没有输出数据,则认为该 task 处于 block 状态,可能是临时卡住,也许永远会卡住。为了
防止因为用户程序永远 block 不退出,则强制设置了一个超时时间(单位毫秒),默认是
600000,值为 0 将禁用超时。。


3.效率跟稳定性参数

(1) mapreduce.map.speculative: 是否为 Map Task 打开推测执行机制,默认为 true, 如
果为 true,则可以并行执行一些 Map 任务的多个实例。
(2) mapreduce.reduce.speculative: 是否为 Reduce Task 打开推测执行机制,默认为 true
(3)mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时最小切
片大小,默认 1。
(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片时最大切
片大小
推测执行机制(Speculative Execution):它根据一定的法则推测出“拖后腿”的任务,并
为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成
功运行完成任务的计算结果作为最终结果。


六、MapReduce其他功能

1.计数器应用

计数器是用来记录 job 的执行进度和状态的。MapReduce 计数器(Counter)

为我们提供一个窗口,用于观察 MapReduce Job 运行期的各种细节数据。对

MapReduce 性能调优很有帮助,MapReduce 性能优化的评估大部分都是基于这些

Counter 的数值表现出来的。

MapReduce 自带了许多默认 Counter。在执行 mr 程序的日志上,大家也许

注意到了类似以下这样的信息:

Shuffle Errors
BAD_ID=0
CONNECTION=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=89
File Output Format Counters
Bytes Written=86


内置计数器包括:

文件系统计数器(File System Counters)
作业计数器(Job Counters)
MapReduce 框架计数器(Map-Reduce Framework)
Shuffle 错误计数器(Shuffle Errors)
文件输入格式计数器(File Output Format Counters)
文件输出格式计数器(File Input Format Counters)


当然, Hadoop 也支持自定义计数器。在实际生产代码中,常常需要将数据

处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助

mapreduce 框架中提供的全局计数器来实现。

示例代码:

public class WordCount{
static class WordCount Mapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
Counter counter =context.getCounter(“SelfCounters”,”myCounters”);
String[] words = value.toString().split(",");
for (String word : words) {
if("hello".equals(word)){counter.increment(1)};
context.write(new Text(word), new LongWritable(1));
}
}
}


2.多job关联

ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
controlledJob1.setJob(job1);
ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
controlledJob2.setJob(job2);
controlledJob2.addDependingJob(controlledJob1); // job2 依赖于 job1
JobControl jc = new JobControl(chainName);
jc.addJob(controlledJob1);
jc.addJob(controlledJob2);
Thread jcThread = new Thread(jc);
jcThread.start();
while(true){
if(jc.allFinished()){
System.out.println(jc.getSuccessfulJobList());
jc.stop();
return 0;
}
if(jc.getFailedJobList().size() > 0){
System.out.println(jc.getFailedJobList());
jc.stop();
return 1;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: