您的位置:首页 > 其它

Spark算子[10]:foldByKey、fold 源码实例详解

2017-12-13 11:05 691 查看
foldByKey与aggregateByKey,fold与aggregate用法相近,作用相似!

foldByKey是aggregateByKey的简化,fold是aggregate的简化。

foldByKey

源码

/**
* 使用关联函数合并每个K的values 并且一个中立的 "zero value" 将被添加到结果中【任意次数】(分区数决定),
* 并且不能改变结果:
* (例如, list集合的Nil, 累加的0值, 乘法的1值等等)
*/

def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {

// 1、序列化 zero value to 为一个字节数组,这样我们就可以在每个Key上得到一个新的克隆体
val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)

// 2、当反序列化时, 用 lazy val 为每一个task近创建一个序列化器实例
lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

// 3、调用combineByKeyWithClassTag实现聚合
val cleanedFunc = self.context.clean(func)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, par
4000
titioner)
}


def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) ⇒ V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) ⇒ V): RDD[(K, V)]

Scala 实例

def foldByKey(): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("FoldByKey")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(("a",0),("a",2),("b",1),("b",2)),4)

val res =rdd.foldByKey(0)(_+_)
res.foreach(println)
}


结果:

(a,2)

(b,3)

Java 实例

public static void foldByKeyOp() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("foldByKeyOp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<String, Integer>> rdd = sc.parallelize(Arrays.asList(
new Tuple2<String, Integer>("a", 0),
new Tuple2<String, Integer>("a", 2),
new Tuple2<String, Integer>("b", 1),
new Tuple2<String, Integer>("b", 2)), 4);
//转pairRDD
JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(pair -> new Tuple2<String, Integer>(pair._1(), pair._2()));

//func: JFunction2[V, V, V]
Function2<Integer, Integer, Integer> func = new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
};
//foldByKey
JavaPairRDD<String, Integer> res = pairRDD.foldByKey(0, func);
//打印结果
res.foreach(x -> System.out.println(x));
}


fold

/**
* @param zeroValue 对于"op"运算符中的每个分区的累积结果的初始值,同时也是组合每个分区累加值的结果操作"op"的初始值。
* @param op 一个操作符用于在一个分区中积累结果,并将来自不同分区的结果组合在一起
*
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// 克隆零值,因为我们也将把它序列化为tasks的一部分
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}


def fold(zeroValue: T)(f: Function2[T, T, T]): T

Scala 实例

def fold(): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("FoldByKey")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List("a","a","b","b"),4)
val res = rdd.fold("A")(_+_)
println(res)
}


结果:AAaAaAbAb
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: