您的位置:首页 > 其它

spark 算子combineByKey 详解

2017-10-05 12:22 441 查看
combineByKey 作为spark 的核心算子之一,有必要详细了解。reduceByKey 和groupByKey 等健值对算子底层都实现该算子。(1.6.0版更新为combineByKeyWithClassTag)

combineByKey 源码定义:

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]


createCombine: 对每个分区中每个key中value中的第一个值,进行处理(可以理解为,分区内部进行按Key分组,每个Key的第一个value进行预处理)预处理逻辑。

mergeValue:分区内部进行聚合,相同的Key的value进行局部运算,区间内部聚合逻辑。

mergeCombiners:此阶段发生shuffer,对不同区间局部运算后的结果再做运算。分区之间聚合逻辑。

测试样例:

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))


过一下下图,逻辑应该就清晰了



注意点:createCombine是对每个分区中每个key中value中的第一个值,进行处理(可以理解为,分区内部进行按Key分组,每个Key的第一个value进行预处理)预处理逻辑。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: