SparkRDD的一些使用经验
2017-01-24 17:10
225 查看
要减少shuffle的开销,主要有两个思路:
减少shuffle次数,尽量不改变key,把数据处理在local完成;
减少shuffle的数据规模。
先去重,再合并
比如有A、B这样两个规模比较大的RDD,如果各自内部有大量重复,那么二者一合并,再去重:
这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小shuffle的开销(注意Spark的默认union和Oracle里面的“union all”很像——不去重):
看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从3个小时减到20分钟。
如果中间结果rdd如果被调用多次,可以显式调用cache()和persist(),以告知Spark,保留当前rdd。
类似地,还有filter等等操作,目的也是要先对大的RDD进行“瘦身”操作,然后在做其他操作。
mapValues比map好
明确key不会变的map,就用mapValues来替代,因为这样可以保证Spark不会shuffle你的数据:
改成:
用broadcast + filter来代替join
再比如,如果遇到RDD操作嵌套的情况,通常考虑优化掉,因为只有master才能去理解和执行RDD的操作,slave只能处理被分配的task而已。比如:
A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}
就可以用join来代替:
A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}
用reduceByKey代替groupByKey
这一条应该是比较经典的了。reduceByKey会在当前节点(local)中做reduce操作,也就是说,会在shuffle前,尽可能地减小数据量。而groupByKey则不是,它会不做任何处理而直接去shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为reduceByKey要求参与运算的value,并且和输出的value类型要一样,但是groupByKey则没有这个要求。
有一些类似的xxxByKey操作,都比groupByKey好,比如foldByKey和aggregateByKey。
另外,还有一条类似的是用treeReduce来代替reduce,主要是用于单个reduce操作开销比较大,可以条件treeReduce的深度来控制每次reduce的规模。
减少shuffle次数,尽量不改变key,把数据处理在local完成;
减少shuffle的数据规模。
先去重,再合并
比如有A、B这样两个规模比较大的RDD,如果各自内部有大量重复,那么二者一合并,再去重:
A.union(B).distinct()
这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小shuffle的开销(注意Spark的默认union和Oracle里面的“union all”很像——不去重):
A.distinct().union(B.distinct()).distinct()
看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从3个小时减到20分钟。
如果中间结果rdd如果被调用多次,可以显式调用cache()和persist(),以告知Spark,保留当前rdd。
类似地,还有filter等等操作,目的也是要先对大的RDD进行“瘦身”操作,然后在做其他操作。
mapValues比map好
明确key不会变的map,就用mapValues来替代,因为这样可以保证Spark不会shuffle你的数据:
A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}
改成:
A.map{case ((B, C), (D, E)) => (B, C, E)}
用broadcast + filter来代替join
再比如,如果遇到RDD操作嵌套的情况,通常考虑优化掉,因为只有master才能去理解和执行RDD的操作,slave只能处理被分配的task而已。比如:
A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}
就可以用join来代替:
A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}
用reduceByKey代替groupByKey
这一条应该是比较经典的了。reduceByKey会在当前节点(local)中做reduce操作,也就是说,会在shuffle前,尽可能地减小数据量。而groupByKey则不是,它会不做任何处理而直接去shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为reduceByKey要求参与运算的value,并且和输出的value类型要一样,但是groupByKey则没有这个要求。
有一些类似的xxxByKey操作,都比groupByKey好,比如foldByKey和aggregateByKey。
另外,还有一条类似的是用treeReduce来代替reduce,主要是用于单个reduce操作开销比较大,可以条件treeReduce的深度来控制每次reduce的规模。
相关文章推荐
- CVS的配置和使用的一些经验
- "[转]以下一些使用ASP.NET和VS.NET2003的经验和技巧," 一文的评论:
- 基于myFaces的JSF技术使用中的一些经验(三)[godroad原创]--使用myFaces的建立一个xml文件的树形表示
- 基于myFaces的JSF技术使用中的一些经验(三)[godroad原创]--使用myFaces的建立一个xml文件的树形表示
- 基于myFaces的JSF技术使用中的一些经验(二)[godroad原创]--dataTable的使用
- 以下一些使用ASP.NET和VISUAL STUDIO.NET2003的经验和技巧
- 分享一下ExpressQuantumGrid4的cxGrid的一些使用方法和经验
- 基于myFaces的JSF技术使用中的一些经验(一)[godroad原创]--乱码的处理
- JfreeChart使用经验总结(分析了使用中碰到的一些常见问题)
- 使用OpenCV的一些经验
- 以下一些使用ASP.NET和VISUAL STUDIO.NET2003的经验和技巧
- .net 下验证控件[validation server control]的一些吐血使用经验
- TestDirector使用的一些经验(三)
- 基于myFaces的JSF技术使用中的一些经验(二)[godroad原创]--dataTable的使用
- Java的一些类的使用经验
- TestDirector使用的一些经验(二)
- 使用自定义用户控件的一些经验
- Java的一些类的使用经验
- 转贴:电容的使用:一些经验和误区
- 网管使用Cisco设备时一些经验