Spark Shuffle六大问题 fetch操作、数据存储、文件个数、什么排序算法简单介绍
2017-09-07 19:52
771 查看
1. 问题一:什么时候进行Shuffle的fetch操作?Shuffle是一边Mapper的Map操作同时进行Reducer端的Shuffle和Reduce操作吗?
错误的观点:Spark是一边Mapper一边Shuffle的,而Hadoop的MapReduce是先完成Mapper然后才开始Reducer的Shuffle。
事实是:Spark一定是先完成Mapper端所有的Tasks才会进行Reducer端的Shuffle过程的。
原因:Spark的Job是按照stage线性执行的,前面的stage必须执行完成才能够执行后面的Reducer的Shuffle过程。为了更好的保证血缘关系,重新计算丢失数据;stage内部最大化的pipeline,并且这个pipeline,f(records) 函数f 作用于集合中的每一条记录,每次只作用于一条记录。
补充说明:Spark的Shuffle是边拉取数据边进行Aggregate操作的(Hadoop不是这样的),其实与Hadoop MapReduce相比其优势确实是在速度上,但是也会导致一些算法不好实现,例如求平均值等(但是Spark帮你内置了这些算子)。
2. 问题二:Shufflefetch过来的数据到底放在了哪里?
抓过来的数据首先肯定是放在Reducer端的内存缓存区中的(Spark曾经有版本要求只放在内存缓存中,数据结构类似于HashMap(AppendOnlyMap)显然特别消耗内存和极易出现OOM,同时也从Reducer端极大的限制了Spark集群的规模),现在的实现都是内存+磁盘的方式(数据结构使用ExternalAppendOnlyMap),当然也可以通过Spark.shuffle.spill=false来设置只能使用内存。使用ExternalAppendOnlyMap的方式时候如果内存使用达到一定临界值,会首先尝试在内存中扩大ExternalAppendOnlyMap(内部有实现算法),如果不能扩容的话才会spill到磁盘。
3. 问题三:Shuffle的数据在Mapper端如何存储,在Reducer端又是如何知道数据具体在哪里的?
在Spark的实现中每一个Stage(里面是ShuffleMapTask)中的Task在Stage的最后一个RDD上一定会注册给Driver上的MapOutputTrackerMaster,Mapper通过和MapOutputTrackerMaster来汇报ShuffleMapTask具体输出数据的位置(具体的输出文件及其内容和Reducer有关的),Reducer是向Driver中的MapOutputTrackerMaster请求数据的元数据,然后和Mapper所在的Executor进行通信。
4. 问题四:仅仅从HashShuffle的角度讲,我们在Shuffle的时候到底可以产生多少Mapper端的中间文件?
例如说:有M和Mapper,有R个Reducer、有C个Core,那么在最原始的Hash Shuffle中可以产生多少个Mapper中的中间文件?
答案是:在没有consilidation机制的情况下会产生M*R个中间文件。
例如在实际生产环境下有E个Excutors(例如100个),每个Excutor有C个Core(例如10个),同时有R个Reducer。有consilidation机制的情况下产生多少个Mapper端的中间文件呢?是否可以回答E*C*R个临时文件呢?
有consilidation机制的情况下第一个问题会产生E*C*R个文件吗?不一定!这取决于一个越来越重要的配置参数”spark.task.cpus”(这个参数决定了运行Spark的每个Task需要多少个Cores,默认情况下是1个),假设”spark.task.cpus”为T,那么第一个问题的答案是:实际可用Cores的个数C/T
* R 。
第二个问题的答案是E*C/T*R 如何理解Consolidation机制,你可以认为是文件池的复用。
5. 问题五:Spark中Sorted-BasedShuffle数据默认的排序算法是什么?好处是什么?
Spark中Sorted-Based Shuffle在Mapper端是进行排序的,包括partition的排序和每个partition内部元素进行排序。但是在Reducer端没有进
a56c
行排序,所以job的结果默认情况下不是排序的。 Sorted-Based
Shuffle 采用Tim-Sort排序算法,好处是可以极为高效的使用Mapper端的排序成果完成全局排序。
6. 问题六:Spark中Tungsten-Sort Shuffle在Mapper中会对内部元素进行排序吗?
Tungsten-Sort Shuffle
在Mapper中不会对内部元素进行排序(只是对partition进行排序),(思考排序会发生在什么时候,是在buffer spill到磁盘的时候,不是发生在进入buffer的是时候),原因是Tungsten-Sort Shuffle是自己管理的二进制序列化后的数据。
Tungsten-Sort Shuffle
什么时候会退化成为Sorted-BasedShuffle:在有Aggregate操作的时候或者Mapper端输出partitions高于16777216或record大于128M等的时候,原因是Tungsten-SortShuffle是自己管理二进制序列化后的数据,以及数组指针管理范围等。(还和序列化器有关系)
错误的观点:Spark是一边Mapper一边Shuffle的,而Hadoop的MapReduce是先完成Mapper然后才开始Reducer的Shuffle。
事实是:Spark一定是先完成Mapper端所有的Tasks才会进行Reducer端的Shuffle过程的。
原因:Spark的Job是按照stage线性执行的,前面的stage必须执行完成才能够执行后面的Reducer的Shuffle过程。为了更好的保证血缘关系,重新计算丢失数据;stage内部最大化的pipeline,并且这个pipeline,f(records) 函数f 作用于集合中的每一条记录,每次只作用于一条记录。
补充说明:Spark的Shuffle是边拉取数据边进行Aggregate操作的(Hadoop不是这样的),其实与Hadoop MapReduce相比其优势确实是在速度上,但是也会导致一些算法不好实现,例如求平均值等(但是Spark帮你内置了这些算子)。
2. 问题二:Shufflefetch过来的数据到底放在了哪里?
抓过来的数据首先肯定是放在Reducer端的内存缓存区中的(Spark曾经有版本要求只放在内存缓存中,数据结构类似于HashMap(AppendOnlyMap)显然特别消耗内存和极易出现OOM,同时也从Reducer端极大的限制了Spark集群的规模),现在的实现都是内存+磁盘的方式(数据结构使用ExternalAppendOnlyMap),当然也可以通过Spark.shuffle.spill=false来设置只能使用内存。使用ExternalAppendOnlyMap的方式时候如果内存使用达到一定临界值,会首先尝试在内存中扩大ExternalAppendOnlyMap(内部有实现算法),如果不能扩容的话才会spill到磁盘。
3. 问题三:Shuffle的数据在Mapper端如何存储,在Reducer端又是如何知道数据具体在哪里的?
在Spark的实现中每一个Stage(里面是ShuffleMapTask)中的Task在Stage的最后一个RDD上一定会注册给Driver上的MapOutputTrackerMaster,Mapper通过和MapOutputTrackerMaster来汇报ShuffleMapTask具体输出数据的位置(具体的输出文件及其内容和Reducer有关的),Reducer是向Driver中的MapOutputTrackerMaster请求数据的元数据,然后和Mapper所在的Executor进行通信。
4. 问题四:仅仅从HashShuffle的角度讲,我们在Shuffle的时候到底可以产生多少Mapper端的中间文件?
例如说:有M和Mapper,有R个Reducer、有C个Core,那么在最原始的Hash Shuffle中可以产生多少个Mapper中的中间文件?
答案是:在没有consilidation机制的情况下会产生M*R个中间文件。
例如在实际生产环境下有E个Excutors(例如100个),每个Excutor有C个Core(例如10个),同时有R个Reducer。有consilidation机制的情况下产生多少个Mapper端的中间文件呢?是否可以回答E*C*R个临时文件呢?
有consilidation机制的情况下第一个问题会产生E*C*R个文件吗?不一定!这取决于一个越来越重要的配置参数”spark.task.cpus”(这个参数决定了运行Spark的每个Task需要多少个Cores,默认情况下是1个),假设”spark.task.cpus”为T,那么第一个问题的答案是:实际可用Cores的个数C/T
* R 。
第二个问题的答案是E*C/T*R 如何理解Consolidation机制,你可以认为是文件池的复用。
5. 问题五:Spark中Sorted-BasedShuffle数据默认的排序算法是什么?好处是什么?
Spark中Sorted-Based Shuffle在Mapper端是进行排序的,包括partition的排序和每个partition内部元素进行排序。但是在Reducer端没有进
a56c
行排序,所以job的结果默认情况下不是排序的。 Sorted-Based
Shuffle 采用Tim-Sort排序算法,好处是可以极为高效的使用Mapper端的排序成果完成全局排序。
6. 问题六:Spark中Tungsten-Sort Shuffle在Mapper中会对内部元素进行排序吗?
Tungsten-Sort Shuffle
在Mapper中不会对内部元素进行排序(只是对partition进行排序),(思考排序会发生在什么时候,是在buffer spill到磁盘的时候,不是发生在进入buffer的是时候),原因是Tungsten-Sort Shuffle是自己管理的二进制序列化后的数据。
Tungsten-Sort Shuffle
什么时候会退化成为Sorted-BasedShuffle:在有Aggregate操作的时候或者Mapper端输出partitions高于16777216或record大于128M等的时候,原因是Tungsten-SortShuffle是自己管理二进制序列化后的数据,以及数组指针管理范围等。(还和序列化器有关系)
相关文章推荐
- 大数据:Spark Shuffle(三)Executor是如何fetch shuffle的数据文件
- Android开发7:简单的数据存储(使用SharedPreferences)和文件操作
- spark shuffle Read fetch过来的数据以ManagedBuffer形式存在时,该底层数据时在堆外内存中还是文件中,是否受memorymanager管理
- 老三章(绪论+简单程序介绍+数据类型),没遇到啥大问题。。
- C#下把txt文件数据读进sql server中存储所遇到的乱码问题
- GFS安装实例-解决共享存储文件系统数据不同步问题
- Ado.Net操作Excel文件数据常见问题及解决
- [java]增删改查后给出操作提示后跳转到数据列表的小问题解决[简单实现]
- [120_移动开发Android]005_android开发之数据存储之文件操作
- Oracle10g技术问题总结(四)SQL的基本操作、脚本文件操作、数据的自动化导出
- 黑马程序员------IO流 操作字节数组的流对象,对象序列化,打印流,随机存储文件,合并流,管道流,操作基本数据类型的流对象
- 对象文件[置顶] 如何在 ASM 存储中使用 Data Pump (expdp impdp)进行数据导出导入操作
- 数据库操作_连接SQL Server数据库示例;连接ACCESS数据库;连接到 Oracle 数据库示例;SqlCommand 执行SQL命令示例;SqlDataReader 读取数据示例;使用DataAdapter填充数据到DataSet;使用DataTable存储数据库表;将数据库数据填充到 XML 文件;10 使用带输入参数的存储过程;11 使用带输入、输出参数的存储过程示;12 获得数据库中表的数目和名称;13 保存图片到SQL Server数据库示例;14 获得插入记录标识号;Exce
- 数据存储之文件操作
- MySql数据文件以及存储位置介绍
- java对cookie的操作比较简单,主要介绍下建立cookie和读取cookie,以及如何设定cookie的生命周期和cookie的路径问题。
- [原创] 不仅拥有XmlDocument一样简单的XML操作方法,并且实现数据文件安全存储功能——XmlEDocument
- Android中使用SharedPreferences进行数据存储及文件操作模式
- Andrid数据存储体系简单介绍
- 文件操作简单举例,从文件读数据,排好序,输出到另一个文件