MapReduce原理总结
2017-09-12 11:42
176 查看
做大数据已经有一年了,在这一年中始终没有去好好看看Google的三驾马车,现在回校学习,好好的看看论文,同时也自己做做笔记,记录一下自己的学习过程。
MapReduce作为Google的曾经三驾马车之一,广为大家所知,但是很多人都只知道他能做一些数据处理的工作,并不清楚他其中的原理。这次就让我们来总结一下MapReduce的原理,进行学习。
问题描述
思考
什么是MapReduce
MapReduce做了什么
一个简单的例子
处理的过程
Talk is cheapShow me the code
Shuffle
什么是shuffle
Map Shuffle
Reduce shuffle
Shuffle总结
总结
我需要最常访问的页面 TOPN
随意上点日志demo:
好吧,我们用python脚本来试试。大概的处理速度如下:
速度好像也能接受呀。
那么 0.7TB?
经过计算,时间耗费约18小时
这种时间消耗已经不能够被接受了。
分而治之
mapreduce就是一个不错的分而治之的框架
Google用来建立海量数据索引 提高查询速度
最早来自原Lisp语言,Python也有map和reduce。
计算任务调度
错误处理
机器内部通信管理
对于这一段话,我们来统计词频
首先map转化为键值对
reduce收到的数据
reduce处理后的数据
也就是像下面的图所示一样:
好的,你以为就这么简单就结束了吗?
不,还有一个大的内容没讲。
我们知道map生成了大量的键值对如:
但是reduce收到的却是:
WHY?
上图
reduce在向map取数据的过程中,做了排序、合并的工作。这个过程真的只是看起来这么简单吗?
让我们一步一步来分析
map阶段的shuffle主要完成了以下工作:
partition
我们有多个reduce,分配到reduce的过程(一般采用Hash),将map的结果发送到相应的reduce端,总的partition的数目等于reducer的数量。也就在这个阶段去往不同reduce的数据分区了。
spill
什么是spill?内存memory buffer塞满了,要写到磁盘上去,在写入本地磁盘时先按照partition、再按照key进行排序。
combine
执行combine操作要求开发者必须在程序中设置了combine,就会进行一次combine(写之前)如:将 (a,1) (a,2) 变成 (a,3)
merge
当map很大时,每次溢写会产生一个spill_file,这样会有多个spill_file,而最终的一个map task输出只有一个文件,因此,最终的结果输出之前会对多个中间过程进行多次溢写文件(spill_file)的合并,此过程就是merge过程。
如果生成的文件太多,可能会执行多次合并,每次最多能合并的文件数默认为10,可以通过属性min.num.spills.for.combine配置;
多个溢出文件合并时,会进行一次排序,排序算法是多路归并排序;
是否还需要做combine操作,一是看是否设置了combine,二是看溢出的文件数是否大于等于3;
最终生成的文件格式与单个溢出文件一致,也是按分区顺序存储,并且输出文件会有一个对应的索引文件,记录每个分区数据的起始位置,长度以及压缩长度,这个索引文件名叫做file.out.index。
作用:拉取数据;
过程:Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。因为这时maptask早已结束,这些文件就归TaskTracker管理在本地磁盘中。
默认情况下,当整个MapReduce作业的所有已执行完成的MapTask任务数超过MapTask总数的5%后,JobTracker便会开始调度执行ReduceTask任务。然后ReduceTask任务默认启动mapred.reduce.parallel.copies(默认为5)个MapOutputCopier线程到已完成的MapTask任务节点上分别copy一份属于自己的数据。这些copy的数据会首先保存的内存缓冲区中,当内冲缓冲区的使用率达到一定阀值后,则写到磁盘上。
Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heapsize设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
这里需要强调的是,merge有三种形式:
1)内存到内存
2)内存到磁盘
3)磁盘到磁盘。
默认情况下第一种形式是不启用的。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge(图中的第一个merge,之所以进行merge是因为reduce端在从多个map端copy数据的时候,并没有进行sort,只是把它们加载到内存,当达到阈值写入磁盘时,需要进行merge) 。这和map端的很类似,这实际上就是溢写的过程,在这个过程中如果你设置有Combiner,它也是会启用的,然后在磁盘中生成了众多的溢写文件,这种merge方式一直在运行,直到没有map端的数据时才结束,然后才会启动第三种磁盘到磁盘的merge(图中的第二个merge)方式生成最终的那个文件。
这就是总体的shuffle的过程,发现了什么:
基本就是 分区+排序+合并
而且在shuffle的过程中发生了大量的溢写
代表了,在这个过程中产生了大量的磁盘IO操作。这也是造成MR速度慢的原因。要知道,内存比磁盘要快的多。
基本思想是分而治之
屏蔽了错误处理、任务调度,对用户很友好
不足:
shuffle的过程中大量的磁盘IO产生,拉低了速度
编程模型的缺陷待我阅读了Spark的论文后再来补上
MapReduce作为Google的曾经三驾马车之一,广为大家所知,但是很多人都只知道他能做一些数据处理的工作,并不清楚他其中的原理。这次就让我们来总结一下MapReduce的原理,进行学习。
问题描述
思考
什么是MapReduce
MapReduce做了什么
一个简单的例子
处理的过程
Talk is cheapShow me the code
Shuffle
什么是shuffle
Map Shuffle
Reduce shuffle
Shuffle总结
总结
问题描述
我是一个新闻客户端的负责人,我有一个一大堆的访问日志,但是我现在要对我的访问做优化?我需要最常访问的页面 TOPN
随意上点日志demo:
127.0.0.1 - - [22/Jul/2017 23:39:02] "GET /favicon.ico HTTP/1.1" 404 – 127.0.0.1 - - [22/Jul/2017 23:39:01] "GET / HTTP/1.1" 200 – 127.0.0.1 - - [22/Jul/2017 23:39:01] "GET / HTTP/1.1" 200 –
好吧,我们用python脚本来试试。大概的处理速度如下:
速度好像也能接受呀。
那么 0.7TB?
经过计算,时间耗费约18小时
这种时间消耗已经不能够被接受了。
思考
可有什么比较好的处理思路?分而治之
mapreduce就是一个不错的分而治之的框架
什么是MapReduce
目前仍然是大数据的主要处理方式之一Google用来建立海量数据索引 提高查询速度
最早来自原Lisp语言,Python也有map和reduce。
MapReduce做了什么
数据分区(HDFS)计算任务调度
错误处理
机器内部通信管理
一个简单的例子
I have a pen and a apple. You have a pen
对于这一段话,我们来统计词频
首先map转化为键值对
(I,1),(have,1) ,(a,1) , (pen,1),(and,1) ,(a,1) (apple,1)
(You,1) , (have,1) , (a,1) ,( pen,1)
reduce收到的数据
(I,[1]), (have,[1,1]),(a,[1,1,1]),(pen,[1,1]),(and,[1]),(apple,[1])
reduce处理后的数据
(I,1), (have,2) , (a,3) , ( pen,2) ,(and,1), (apple,1)
处理的过程
map(String key, String value): // key: 文档名 // value: 文档的内容 for each word w in value: EmitIntermediate(w, "1"); //输出单词与次数的 map reduce(String key, Iterator values): // key: 单词名 // values: 次数的列表 int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
也就是像下面的图所示一样:
Talk is cheap.Show me the code!
public void map(Object key, Text value, Context context) { StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } public void reduce(Text key, Iterable<IntWritable> values, Context context) { int sum = 0; for(IntWritable val:values) { sum += val.get(); } result.set(sum); context.write(key,result); }
好的,你以为就这么简单就结束了吗?
不,还有一个大的内容没讲。
我们知道map生成了大量的键值对如:
(I,1),(have,1) ,(a,1) , (pen,1),(and,1) ,(a,1) (apple,1)
(You,1) , (have,1) , (a,1) ,( pen,1)
但是reduce收到的却是:
(I,[1]), (have,[1,1]),(a,[1,1,1]),(pen,[1,1]),(and,[1]),(apple,[1])
WHY?
Shuffle
什么是shuffle?
shuffle是MR中一个非常重要的过程,缺了这个过程就不算mapreduce。如何简单的来描述这个过程呢?上图
reduce在向map取数据的过程中,做了排序、合并的工作。这个过程真的只是看起来这么简单吗?
让我们一步一步来分析
Map Shuffle
map阶段的shuffle主要完成了以下工作:
partition
我们有多个reduce,分配到reduce的过程(一般采用Hash),将map的结果发送到相应的reduce端,总的partition的数目等于reducer的数量。也就在这个阶段去往不同reduce的数据分区了。
spill
什么是spill?内存memory buffer塞满了,要写到磁盘上去,在写入本地磁盘时先按照partition、再按照key进行排序。
combine
执行combine操作要求开发者必须在程序中设置了combine,就会进行一次combine(写之前)如:将 (a,1) (a,2) 变成 (a,3)
merge
当map很大时,每次溢写会产生一个spill_file,这样会有多个spill_file,而最终的一个map task输出只有一个文件,因此,最终的结果输出之前会对多个中间过程进行多次溢写文件(spill_file)的合并,此过程就是merge过程。
如果生成的文件太多,可能会执行多次合并,每次最多能合并的文件数默认为10,可以通过属性min.num.spills.for.combine配置;
多个溢出文件合并时,会进行一次排序,排序算法是多路归并排序;
是否还需要做combine操作,一是看是否设置了combine,二是看溢出的文件数是否大于等于3;
最终生成的文件格式与单个溢出文件一致,也是按分区顺序存储,并且输出文件会有一个对应的索引文件,记录每个分区数据的起始位置,长度以及压缩长度,这个索引文件名叫做file.out.index。
Reduce shuffle
作用:拉取数据;
过程:Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask所在的TaskTracker获取maptask的输出文件。因为这时maptask早已结束,这些文件就归TaskTracker管理在本地磁盘中。
默认情况下,当整个MapReduce作业的所有已执行完成的MapTask任务数超过MapTask总数的5%后,JobTracker便会开始调度执行ReduceTask任务。然后ReduceTask任务默认启动mapred.reduce.parallel.copies(默认为5)个MapOutputCopier线程到已完成的MapTask任务节点上分别copy一份属于自己的数据。这些copy的数据会首先保存的内存缓冲区中,当内冲缓冲区的使用率达到一定阀值后,则写到磁盘上。
Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heapsize设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。
这里需要强调的是,merge有三种形式:
1)内存到内存
2)内存到磁盘
3)磁盘到磁盘。
默认情况下第一种形式是不启用的。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge(图中的第一个merge,之所以进行merge是因为reduce端在从多个map端copy数据的时候,并没有进行sort,只是把它们加载到内存,当达到阈值写入磁盘时,需要进行merge) 。这和map端的很类似,这实际上就是溢写的过程,在这个过程中如果你设置有Combiner,它也是会启用的,然后在磁盘中生成了众多的溢写文件,这种merge方式一直在运行,直到没有map端的数据时才结束,然后才会启动第三种磁盘到磁盘的merge(图中的第二个merge)方式生成最终的那个文件。
Shuffle总结
这就是总体的shuffle的过程,发现了什么:
基本就是 分区+排序+合并
而且在shuffle的过程中发生了大量的溢写
代表了,在这个过程中产生了大量的磁盘IO操作。这也是造成MR速度慢的原因。要知道,内存比磁盘要快的多。
总结
MR是一个简单高效的计算模型基本思想是分而治之
屏蔽了错误处理、任务调度,对用户很友好
不足:
shuffle的过程中大量的磁盘IO产生,拉低了速度
编程模型的缺陷待我阅读了Spark的论文后再来补上
相关文章推荐
- 【hadoop】mapreduce原理总结
- hadoop学习之路----MapReduce原理与基本架构总结(第三讲)
- Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结
- 使用HADOOP演示MAPREDUCE配置总结
- java多线程总结五:线程池的原理及实现
- Dubbo原理总结
- 【Apache Hadoop】MapReuce 编程总结-多MapReduce执行
- 总结Java垃圾回收器的方法和原理
- Atitit.数据库存储引擎的原理与attilax 总结
- hadoop mapreduce排序原理
- 转载和积累系列 - MapReduce原理
- 手机充电原理分析及问题总结
- 数据库系统原理及其应用总结---ShinePans
- java多线程学习总结之一:基础原理
- Hadoop MapReduce原理
- hadoop作业调优参数整理及原理(整个mapreduce运行流程都讲的清楚,一步一步优化)
- Hadoop MapReduce原理
- Android之ListView原理学习与优化总结
- Android之ListView原理学习与优化总结
- oauth2 原理和理解(转载阮一峰的记录一下,后面自己再总结)