Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
2017-07-21 14:57
597 查看
Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
1) folddef fold(zeroValue: T)(op: (T, T) => T): T
这个api算子接收一个初始值,fold算子传入一个函数,合并两个同类型的值,同时返回一个相同类型的值
这个算子在每个分区对值进行合并。在每个分区合并时以一个zeroValue作为在每个在每个分区合并的时候的初始值。
val a = sc.parallelize(List(1,2,3), 3) a.fold(0)(_ + _) //这个有点类似于reduce函数。只不过加了一个初始值。 res59: Int = 6
2) foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
首先来看它的api,foldByKey接收一个初始值,zeroValue类型,最后返回的键值对的value的类型也是和初始值的类型保持一致。和redeuceByKey相比,foldByKey加入了初始值。
可以看几个例子
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) //b的类型为(Int, String) b.foldByKey("")(_ + _).collect //这个表示根据key相同进行聚合。由于长度都是3所以聚合后的结果为如下所示。 res84: Array[(Int, String)] = Array((3,dogcatowlgnuant) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) scala> val rdd= sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)),2) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:27 scala> rdd.foldByKey(100)(_+_).collect.foreach(println) (B,103) (A,102) (C,101) //其实foldByKey内部调用的是combineByKey。zeroValue其实类似于createCombiner,而mergeValue和mergeCombiner是一样的(都是我们传入的这个函数),都是先在分区内进行操作,然后接着将分区内合并的结果再次进行合并。
3) treeAggregate
首先来看看这个treeAggregate的算子的api:
def treeAggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U, depth: Int = 2)(implicit arg0: ClassTag[U]): U
这个算子返回的结果为U类型。首先传入一个初始值,同样的,第一个函数式首先在分区被进行操作,seqOp。将在分区内遇到的T类型的合并为U类型的,最后将不同分区的合并后的U类型的结果进行合并。第一个函数作用于分区内,第二个函数作用于分区间。
treeAggregate和aggregate类似,只不过,它是通过一个多层树的形式进行聚合的。还有一个就是这个初始值不作用于第二个函数,只是在第一个函数起作用。默认的深度为2.
val z = sc.parallelize(List(1,2,3,4,5,6), 2) def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = { iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator } z.mapPartitionsWithIndex(myfunc).collect res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6]) z.treeAggregate(0)(math.max(_, _), _ + _) res40: Int = 9 //同样的,首先在每个分区求出最大值,然后在分区间进行合并。初始值不作用于第二个函数。 //如果将初始值变换为5的话,则首先在第一个分区max(5,1,2,3)=5 //如果将初始值变换为5的话,则在第2个分区max(5,4,5,6)=6 // 最后的结果是 5 + 6 = 11,没有引入初始值 z.treeAggregate(5)(math.max(_, _), _ + _) res42: Int = 11
4)treeReduce
def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T
treeReduce有点类似于reduce函数,也不需要传入初始值,只不过这个算子使用一个多层树的形式来进行reduce操作。
val z = sc.parallelize(List(1,2,3,4,5,6), 2) z.treeReduce(_+_) res49: Int = 21
相关文章推荐
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark编程之基本的RDD算子-aggregate和aggregateByKey
- Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union
- Spark编程之基本的RDD算子之cogroup,groupBy,groupByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark编程之基本的RDD算子count, countApproxDistinct, countByValue等
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、lookup(一)
- spark2.x由浅入深深到底系列六之RDD api reduceByKey与foldByKey对比
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子[10]:foldByKey、fold 源码实例详解
- Spark编程之基本的RDD算子之join,rightOuterJoin, leftOuterJoin
- Spark编程之基本的RDD算子之map,mapPartitions, mapPartitionsWithIndex.