您的位置:首页 > 编程语言

Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce

2017-07-21 14:57 597 查看

Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce

1) fold

def fold(zeroValue: T)(op: (T, T) => T): T


这个api算子接收一个初始值,fold算子传入一个函数,合并两个同类型的值,同时返回一个相同类型的值

这个算子在每个分区对值进行合并。在每个分区合并时以一个zeroValue作为在每个在每个分区合并的时候的初始值。

val a = sc.parallelize(List(1,2,3), 3)
a.fold(0)(_ + _) //这个有点类似于reduce函数。只不过加了一个初始值。
res59: Int = 6


2) foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]


首先来看它的api,foldByKey接收一个初始值,zeroValue类型,最后返回的键值对的value的类型也是和初始值的类型保持一致。和redeuceByKey相比,foldByKey加入了初始值。

可以看几个例子

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x)) //b的类型为(Int, String)
b.foldByKey("")(_ + _).collect //这个表示根据key相同进行聚合。由于长度都是3所以聚合后的结果为如下所示。
res84: Array[(Int, String)] = Array((3,dogcatowlgnuant)

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.foldByKey("")(_ + _).collect
res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

scala> val rdd= sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:27

scala> rdd.foldByKey(100)(_+_).collect.foreach(println)
(B,103)
(A,102)
(C,101)

//其实foldByKey内部调用的是combineByKey。zeroValue其实类似于createCombiner,而mergeValue和mergeCombiner是一样的(都是我们传入的这个函数),都是先在分区内进行操作,然后接着将分区内合并的结果再次进行合并。


3) treeAggregate

首先来看看这个treeAggregate的算子的api:

def treeAggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U, depth: Int = 2)(implicit arg0: ClassTag[U]): U


这个算子返回的结果为U类型。首先传入一个初始值,同样的,第一个函数式首先在分区被进行操作,seqOp。将在分区内遇到的T类型的合并为U类型的,最后将不同分区的合并后的U类型的结果进行合并。第一个函数作用于分区内,第二个函数作用于分区间。

treeAggregate和aggregate类似,只不过,它是通过一个多层树的形式进行聚合的。还有一个就是这个初始值不作用于第二个函数,只是在第一个函数起作用。默认的深度为2.

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}

z.mapPartitionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

z.treeAggregate(0)(math.max(_, _), _ + _)
res40: Int = 9 //同样的,首先在每个分区求出最大值,然后在分区间进行合并。初始值不作用于第二个函数。
//如果将初始值变换为5的话,则首先在第一个分区max(5,1,2,3)=5
//如果将初始值变换为5的话,则在第2个分区max(5,4,5,6)=6
// 最后的结果是 5 + 6 = 11,没有引入初始值
z.treeAggregate(5)(math.max(_, _), _ + _)
res42: Int = 11


4)treeReduce

def  treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T


treeReduce有点类似于reduce函数,也不需要传入初始值,只不过这个算子使用一个多层树的形式来进行reduce操作。

val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.treeReduce(_+_)
res49: Int = 21
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark rdd fold reduce aggregate
相关文章推荐