您的位置:首页 > 其它

spark-调优-shuffle

2017-09-21 23:13 162 查看
合并map端输出文件

new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)

默认情况下,是不开启的,开启shuffle map端输出文件合并的机制之后:

第一个stage,同时就运行cpu core个task,比如cpu core是2个,并行运行2个task;

每个task都创建下一个stage的task数量个文件;

第一个stage,并行运行的2个task执行完以后;就会执行另外两个task;

另外2个task不会再重新创建输出文件;而是复用之前的task创建的map端输出文件,

将数据写入上一批task的输出文件中。

第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每一个task为自己创建的那份输出文件了;

调节map端内存缓冲和reduce端聚合内存占比

如果发现shuffle 磁盘的write和read,很大,可以调节这两个参数

调节上面说的那两个参数。调节的时候的原则。

spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;

spark.shuffle.memoryFraction,每次提高0.1,看看效果。

不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,

其他环节的内存使用就会有问题了。

map task内存缓冲变大了,减少spill到磁盘文件的次数;

reduce端聚合内存变大了,减少spill到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。

尽量不用shuffle操作

spark中会导致shuffle操作的有以下几种算子

1、repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等

2、byKey类的操作:比如reduceByKey、groupByKey、sortByKey等

3、join类的操作:比如join、cogroup等

ShuffleManager选择

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。而随着Spark的版本的发展,ShuffleManager也在不断迭代,变得越来越先进。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

如果你希望你的每个task输出的文件 最终是合并成一份的 认为可以减少性能开销

可以去调节spark.shuffle.sort.bypassMergeThreshold这个阈值

比如你的reduce task数量是500 默认阈值是200

所以默认还会进行sort和直接merge

可以将阈值调节550

按照hash的做法 每个reduce task创建一份输出文件

最后合并成一份文件

超出这个值会 sort 排序

这个参数 没有经过验证 生产环境进行验证

//默认就是

new SparkConf().set(“spark.shuffle.manager”,”sort”)

new SparkConf().set(“spark.shuffle.manager”,”hash”)

new SparkConf().set(“spark.shuffle.manager”,”tungsten-sort”)

可以手动设置下

new SparkConf().set(“spark.shuffle.sort.bypassMergeThreshold”,”550”)

shuffle相关参数调优

spark.shuffle.file.buffer

● 默认值:32k

● 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

● 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight

● 默认值:48m

● 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

● 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

● 默认值:3

● 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

● 调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

spark.shuffle.io.retryWait

● 默认值:5s

● 参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

● 调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction

● 默认值:0.2

● 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

● 调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

● 默认值:sort

● 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

● 调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

● 默认值:200

● 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

● 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

● 默认值:false

● 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

● 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

参考站点:

http://blog.csdn.net/a1043498776/article/details/77323561
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: