您的位置:首页 > 其它

Spark Sort Based Shuffle源码详细解析----数据流篇----Shuffle Write流

2017-01-05 13:02 666 查看
Spark Shuffle这部分的内容比较多,  可以通过以下图示的角度对内容做细分:

Spark Shuffle

    ----> 模块架构

    ----> 模块交互关系及时序

    ----> 数据流

        ----> Hash Based Shuffle

        ----> Sort Based Shuffle

            ----> Shuffle Write

            ----> Shuffle Read

本篇文章主要从Sort Based Shuffle的整体数据流角度出发, 详细解析Spark Shuffle数据处理的整体流程.

这两天突然问了自己一个问题:"如今Spark shuffle已经使用实现了spill到磁盘的机制", 那么groupByKey()操作, 还可能会出现官方文档上描述的OOM问题么"?

* Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
*/

回答这个问题需要了解Spark Shuffle的内部实现原理.

总体调用框架

SortShuffleWriter::write() ----> ExternalSorter::insertAll() ----> map.changeValue() //从RDD的iterator中读取数据, 并做本地combiner; ----> maybeSpill() //判断是否需要将内存数据spill到磁盘; ----> spill(collection) //内存数据spill到磁盘; ----> sorter.writePartitionedFile(blockId, context, outputFile) //写最终的shuffle文件; -> merge(spills, collection.partitionedDestructiveSortedIterator(comparator)) //merge内存的数据和磁盘的spill文件的数据; -> mergeWithAggregation(iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) // 对数据再一次做combiner操作; -> writer.write(elem._1, elem._2) //真正写入最终的shuffle文件;

源码详细分析

Spark Shuffle的入口从SortShuffleWriter::write()开始:

   ---->dep.mapSideCombine:是否需要map阶段进行本地combiner;       1 如果需要,那么需要传入aggregator和keyOrdering;            aggregator用于指示进行combiner的操作( keyOrdering用于传递key的排序规则);       2 如果不需要, 那么本地每个分区的数据不会做sort;  //groupByKey()会走这里;            这个原因应该是: 对于不需要本地做combiner的操作, 是不需要对多个spill文件进行merge的, 所以不需要排序.  -----> ExternalSorter::insertAll():      根据上面的条件, 决定传入insertAll()的参数; insertAll会生成spill文件, 所以需要知道是否要求排序;     ----> 如果定义了aggregator, 那么需要进行数据的本地combiner: createCombiner或者mergeValue;         ----> 从RDD的iterator中不断读取数据;         ----> map.changeValue((getPartition(kv._1),kv._1),update)             ----> 在data数组中查找是否已经存在这个key, 存在则调用mergeValue, 不存在则调用createCombiner, hash冲突则继续找下一个位置的key;         ----> maybeSpillCollection()             ----> maybeSpill(map,map.estimateSize()))         ----> 判断内存是否够用: 尝试向shuffleMemoryManager申请内存, 如果申请到了, 则返回,申请不到, 就需要spill到磁盘;             -----> spill(collection)             -----> 根据bypassMergeSort决定是调用哪个方法:                 ----->spillToMergeableFile(collection)                           文件名为: temp_shuffle_f05eddd9-0705-462e-b595-74258a4748a7                     文件路径为: /private/var/folders/_s/ld8n_xld75j4qrt0vmk1653w0000gn/T/spark-16999b3c-5035-41ab-99ef-04db1083e24f/blockmgr-f5074a9f-8d62-491a-9de7-8d33276e7f8a/20/temp_shuffle_f05eddd9-0705-462e-b595-74258a4748a7                          -----> collection.destructiveSortedWritablePartitionedIterator(comparator)                              ----> partitionedDestructiveSortedIterator(keyComparator)                                  -----> destructiveSortedIterator(comparator)                                     ----> 如果是用PartitionedAppendOnlyMap做spill,则会将其内部的data::AppendOnlyMap做重组排序, 使用的是partitionKeyComparator,然后返回一个可以从data顺次读取数据的iterator;                             ----> WritablePartitionedIterator.fromIterator() //返回一个可以写文件的iterator对象;                         ----> it.writeNext(writer) //开始写文件;                 ---->spills.append(SpilledFile(file,blockId,batchSizes.toArray,elementsPerPartition))     ----> 如果没定义aggregator, 就不需要combiner:   // groupByKey()会走这里;         ----> 如果也不需要排序,而且reduce分区小于200(bypassMergeSort):             ----> 直接写分区文件, 每个分区一个文件;这样可以减少序列化与反序列化的操作, 提高效率; 类似HashShuffleWriter,只是最后也会合并为一个大文件.这种优化会为一些如groupBy,sort等操作提速, 因为这些操作没有本地combiner. 最后生成的文件也算:  a single sorted file         ----> 如果不需要本地combiner, 但是reduce分区数较多, 或者需要排序,那么使用替代map的数据结构:buffer, 它不需要支持持续的combiner运算. 这种情况后面需要spill操作;    ----> valpartitionLengths=sorter.writePartitionedFile(blockId,context,outputFile):        ----> 如果是bypassMergeSort, 且有分区文件,则将内存的数据调用spillToPartitionFiles()写入spill文件, 然后简单地合并所有spill file;  //写bypassMergeSort"的分支.        ----> 如果只有内存数据, 没有spill file,那么也很简单,只用把内存数据进行排序(按用户排序规则或者hashcode规则), 然后写入最终磁盘文件即可;        ----> 调用this.partitionedIterator()获得最终的所有数据, 包含在内存的数据和在spillfile的数据;            ----> 如果只有内存数据, 没有spille file:                ----> 根据是否需要排序调用groupByPartition():                    如果不用排序, 则直接按分区对数据进行排序, 然后返回数据的iterator;                     如果需要排序,则按照用户设定的规则和分区值进行排序, 默认使用hashcode方式对数据进行排序, 然后返回数据的iterator;                    由于reduce task读取的时候是不关心顺序的,为什么这里还需要排序??            ----> bypassMergeSort的情况, 且有spill file:                     这块没看懂, 按理说不应该能走进来,因为这种条件在外部的"写bypassMergeSort"的分支包含了.            ----> 其他情况会调用merge(spills,collection.partitionedDestructiveSortedIterator(comparator)); //按照用户设定的规则和分区值进行排序,默认使用hashcode方式对数据进行排序;
merge(spills:Seq[SpilledFile],inMemory:Iterator[((Int,K),C)]) 需要combiner或者设定了需要排序的,才会进行排序输出生成有序文件;    ----> valreaders=spills.map(newSpillReader(_)) //获得所有的spill file; 磁盘的数据是按照分区进行排序的,分区内也是有序(但是这个顺序有可能是total order,有可能是partial order);    ----> valinMemBuffered=inMemory.buffered //获得memory内的数据;内存的数据是按照分区进行排序的, 分区内也是有序(但是这个顺序有可能是total order,有可能是partial order);    ----> 定义并返回一个(分区号, Iterator(K,C))的iterator, 这个iterator的next()操作:        ----> 将内存中指定分区的数据读取出来, 再通过SpillReader将当前该读取的分区数据读取出来,然后合并, 形成新的iterator,这个iterator只有单分区的数据;        ----> 如果定义了aggregator, 那么就需要做mergeCombiner操作;             这里是shuffle write阶段最后的combiner操作了;            也 可以看到, 这里和之前的insertAll()方法中, 读取数据然后做mergeValue和createCombiner是遥相呼应的, 只有做了这2个的数据,才能做mergeCombiner, 因为处理方法的参数不同;            mergeWithAggregation(iterators,aggregator.get.mergeCombiners,keyComparator,ordering.isDefined));               ----> 如果没有定义totalOrder,那么生成一个Iterator对象, 这个iterator对象的next()方法:                    ---->sorted是一个从小根对读取数据的iterator, 每次从 sorted中读取数据; 用keyComparator对多路数据进行归并,取出当前key最小的数据,以及和这个key的hashcode相等的所有key, 然后精确比较找出和每个key相等的所有数据, 找到一条数据就会进行mergeCombiners合并;                    可以看到,这里是需要将key所有的数据放入内存的,所以可能由于value过多导致OOM;                 ----> 如果定义了totalOrder, 那么生成一个Iterator对象, 这个iterator对象的next()方法:                     valsorted=mergeSort(iterators,comparator).buffered;  //还是用keyComparator生成读取小根对的iterator对象;                     后面的操作, 认为数据是已经按照totalorder 做好了排序, 所以hashcode相等的key必定相等, 这样就是顺序读取key相同的数据, 做mergeCombiner操作;        ---->如果没有定义aggregator, 但是定义了排序: ordering.isDefined       调用valsorted=mergeSort(iterators,comparator).buffered; 就生成小根对的iterator对象就可以了, 因为不需要做mergeCombiner; 排序方式是由用户指定的;       ----> 不用排序,也不需要做aggregator,那么直接按照key的hashcode的顺序依次输出就好;至此, Spark Shuffle的写操作代码就完全分析完了.

原理阐述

关于Spark实现Sort based shuffle的原因

    1 Spark也是实现了类似MR的机制, 每个maptask输出到内存, 当内存中的数据达到阈值后, 就会spill到磁盘. 最后, 需要将所有spill的小文件合并成1个大文件, 一个maptask生成一个shuffle结果文件.  每个spill小文件和最后的大文件,包含所有的reduce分区数据, 按照分区先后存放在文件中; 每个spill的小文件是分区内sorted的, 最终合成的大文件, 也是分区内sorted的.    2 为什么需要在map task阶段对数据进行排序?          我理解这个原因也是和MR一样:多个spill文件之间进行merge的时候, 只能使用外部归并排序的方式, 从spill文件中读取数据, 然后互相比较; 磁盘数据的读取, 只能是顺序读取,不能通过key值随机定位到磁盘位置,然后读取; 例如spillFile1有1个abc这个key,想和spillFile2的相同abc这个key做combiner计算时, 就必须这2个文件都有序,然后读取spillFile1的这个key时,正好也读取到spillFile2的这个key.

Spark Shullfe组件的关系

ShuffleManager     ---派生--->  SortShuffleManager                       ----> IndexShuffleBlockResolver indexShuffleBlockResolver; //负责实际读写block逻辑;                        ----> getReader()                             ----> new HashShuffleReader()                        ----> getWriter()                            ----> new SortShuffleWriter()    ---派生--->  HashShuffleManager                       ----> FileShuffleBlockResolver fileShuffleBlockResolver; // 负责实际读写block逻辑;                        ----> getReader()                             ----> new HashShuffleReader()                        ----> getWriter()                            ----> new HashShuffleWriter()spark.shuffle.consolidateFiles只与Hash Based Shuffle有关;

本地Combine, Sort, Spill, Merge的相互关系

1 是否需要本地combiner, 决定了是否需要数据做排序: 因为spill文件最后合并时需要做mergeCominer; maptask做sort的唯一原因就是为了上面的combiner操作, 需要磁盘小文件有序; 至于reduce task读取数据,是不需要有序的,因为也是先读取到map结构中的内存中, 然后再spill到有序文件中, 最后合并.2 spill文件最后合并时需要做mergeCominer   -----> 决定了每个spill file要有序;    ----> 决定了spill操作要排序;3 如果设定了aggregator, 表明需要做combiner, 则需要排序:      ----> 如果用户设定了ordering,则使用用户的排序规则;      ----> 如果用户没有设定,则使用内置的hashcode的排序规则;

对于Spark Sort Based Shuffle的疑问

既然shuffleread时是不管读取的数据是否有序的, 那么为什么对于map阶段不做combiner的时候, 还需要按照key做排序? 比如在merge()时;

关于groupByKey()是否会存在OOM的问题

虽然Spark Shuffle通过将内存数据spill磁盘的方式很大程度上解决了OOM的问题, 但是通过上面的源码分析可知, 在多个spill文件做merge
合并的时候, 会将hashcode值相等的key的数据都读取出来, 做combiner,才会写入磁盘.
所以, 其实需要做combiner的transform还是会有OOM的风险的.
但是groupByKey()不会有这个问题, 因为groupByKey不会进行map端的combiner, 所以根本不会进行排序, 这样就不会在map
task阶段OOM.

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐