您的位置:首页 > 其它

[Spark_API]Transformation-reduceByKey()和aggregateByKey()

2017-05-10 17:24 495 查看
aggregateByKey()与reduceByKey()(两者都会调用combineByKey()),唯一不同的是aggregateByKey()你会给出一个初始值zeroValue.

之所以reduceByKey()更好,是由于它使用了MapReduce的combiner这个特征,比如在类似+,*的这类combiner函数计算中,由于元素的顺序都无关紧要,使得Spark能够在多个分区上聚合(reduce)数值。

然而aggregateByKey()会聚合一个特定键的值,聚合(combined)的结果可以是任何特定的对象类型。你必须确定这些值在分区内部如何聚合(同一个节点),以及你如何将不同区的结果聚合。ReduceByKey()是一个特殊的例子,特殊之处在于值聚合后的数据类型和值的类型相同,在分区内部聚合的操作和聚合不同分区结果的操作一样。

作为一个例子,假如你有一组数组

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
现在你想使用对于同一个键,对它所有的值进行加法聚合,在这个例子中,reduceByKey和aggregateByKey()是一样的。

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))

//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))
然而,假如你想要聚合的结果是同一个键的所有值的set,但值本身的数据类型是整数.在这个情况下,你只能使用aggregateByKey()

来确定最后聚合结果的类型为set。

import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the  _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3)))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐