您的位置:首页 > 其它

SparkRDD的一些使用经验

2017-01-24 17:10 225 查看
要减少shuffle的开销,主要有两个思路:
减少shuffle次数,尽量不改变key,把数据处理在local完成;
减少shuffle的数据规模。

先去重,再合并

比如有A、B这样两个规模比较大的RDD,如果各自内部有大量重复,那么二者一合并,再去重:
A.union(B).distinct()


这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小shuffle的开销(注意Spark的默认union和Oracle里面的“union all”很像——不去重):
A.distinct().union(B.distinct()).distinct()


看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从3个小时减到20分钟。

如果中间结果rdd如果被调用多次,可以显式调用cache()和persist(),以告知Spark,保留当前rdd。

类似地,还有filter等等操作,目的也是要先对大的RDD进行“瘦身”操作,然后在做其他操作。

mapValues比map好

明确key不会变的map,就用mapValues来替代,因为这样可以保证Spark不会shuffle你的数据:
A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}


改成:
A.map{case ((B, C), (D, E)) => (B, C, E)}


用broadcast + filter来代替join
再比如,如果遇到RDD操作嵌套的情况,通常考虑优化掉,因为只有master才能去理解和执行RDD的操作,slave只能处理被分配的task而已。比如:

A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}

就可以用join来代替:

A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}

用reduceByKey代替groupByKey

这一条应该是比较经典的了。reduceByKey会在当前节点(local)中做reduce操作,也就是说,会在shuffle前,尽可能地减小数据量。而groupByKey则不是,它会不做任何处理而直接去shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为reduceByKey要求参与运算的value,并且和输出的value类型要一样,但是groupByKey则没有这个要求。

有一些类似的xxxByKey操作,都比groupByKey好,比如foldByKey和aggregateByKey。

另外,还有一条类似的是用treeReduce来代替reduce,主要是用于单个reduce操作开销比较大,可以条件treeReduce的深度来控制每次reduce的规模。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: