Spark算子[10]:foldByKey、fold 源码实例详解
2017-12-13 11:05
691 查看
foldByKey与aggregateByKey,fold与aggregate用法相近,作用相似!
foldByKey是aggregateByKey的简化,fold是aggregate的简化。
def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) ⇒ V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) ⇒ V): RDD[(K, V)]
Scala 实例
结果:
(a,2)
(b,3)
Java 实例
def fold(zeroValue: T)(f: Function2[T, T, T]): T
Scala 实例
结果:AAaAaAbAb
foldByKey是aggregateByKey的简化,fold是aggregate的简化。
foldByKey
源码/** * 使用关联函数合并每个K的values 并且一个中立的 "zero value" 将被添加到结果中【任意次数】(分区数决定), * 并且不能改变结果: * (例如, list集合的Nil, 累加的0值, 乘法的1值等等) */ def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // 1、序列化 zero value to 为一个字节数组,这样我们就可以在每个Key上得到一个新的克隆体 val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // 2、当反序列化时, 用 lazy val 为每一个task近创建一个序列化器实例 lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) // 3、调用combineByKeyWithClassTag实现聚合 val cleanedFunc = self.context.clean(func) combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, par 4000 titioner) }
def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) ⇒ V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) ⇒ V): RDD[(K, V)]
Scala 实例
def foldByKey(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("FoldByKey") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List(("a",0),("a",2),("b",1),("b",2)),4) val res =rdd.foldByKey(0)(_+_) res.foreach(println) }
结果:
(a,2)
(b,3)
Java 实例
public static void foldByKeyOp() { SparkConf conf = new SparkConf().setMaster("local").setAppName("foldByKeyOp"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Tuple2<String, Integer>> rdd = sc.parallelize(Arrays.asList( new Tuple2<String, Integer>("a", 0), new Tuple2<String, Integer>("a", 2), new Tuple2<String, Integer>("b", 1), new Tuple2<String, Integer>("b", 2)), 4); //转pairRDD JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(pair -> new Tuple2<String, Integer>(pair._1(), pair._2())); //func: JFunction2[V, V, V] Function2<Integer, Integer, Integer> func = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }; //foldByKey JavaPairRDD<String, Integer> res = pairRDD.foldByKey(0, func); //打印结果 res.foreach(x -> System.out.println(x)); }
fold
/** * @param zeroValue 对于"op"运算符中的每个分区的累积结果的初始值,同时也是组合每个分区累加值的结果操作"op"的初始值。 * @param op 一个操作符用于在一个分区中积累结果,并将来自不同分区的结果组合在一起 * */ def fold(zeroValue: T)(op: (T, T) => T): T = withScope { // 克隆零值,因为我们也将把它序列化为tasks的一部分 var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult }
def fold(zeroValue: T)(f: Function2[T, T, T]): T
Scala 实例
def fold(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("FoldByKey") val sc = new SparkContext(conf) val rdd = sc.makeRDD(List("a","a","b","b"),4) val res = rdd.fold("A")(_+_) println(res) }
结果:AAaAaAbAb
相关文章推荐
- Spark算子[12]:groupByKey、cogroup、join、lookup 源码实例详解
- Spark算子[13]:sortByKey、sortBy、二次排序 源码实例详解
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- spark中算子详解:aggregateByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- spark中算子详解:combineByKey
- Spark算子[19]:saveAsHadoopFile、saveAsNewAPIHadoopFile 源码实例详解
- Spark算子篇 --Spark算子之aggregateByKey详解
- spark中算子详解:combineByKey
- Spark算子[15]:sample、takeSample 源码实例详解
- Spark算子[14]:top、takeOrdered 源码实例详解
- Spark核心RDD:foldByKey函数详解
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- spark 算子combineByKey 详解
- spark中算子详解:aggregateByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey