通过 “由对象V到对象C的转换” 来说明 Spark_Streaming api中 reduceByKey 与 combineByKey 注意事项
2016-08-11 08:51
701 查看
今年以来一直在学习通过Spark Streaming 来处理公司大数据相关的业务需求。去重\汇总 在大数据统计中是很常见的。而reduceByKey、combineByKey 在 Spark Streaming 中做合并操作时(由对象V到对象C的转换)很重要的两个api . [b]网上的事例大部分太过简单,或者讲解过于皮毛。[/b]
总体来看, reduceByKey 比 combineByKey 更简单,combineByKey 比 reduceByKey 更通用,另外,在实际转换(从对象V到对象C)过程当中,对开发员来讲是非常讲究的。而我将就实际的需求事例,来做讲解:
现有 V 与 C 两个对象:
现有数据内容如下:
现在需要按 V对象的值进行分组合并,将转化成 C对象,现有两种实现方式:
第一种:
第二种:
结果都会如下:
2-1
3-1
1-3
其中,这两种实现逻辑是等价的。但是根据实际消耗的时间来看,第二种要好于第一种。
另外,重点讲一个严重错误事例,有的童鞋为了省事,只用V一个对象做合并,然后 V对象内新建一个集合属性,如下:
然后,在代码中如下:
却发现结果为:
2-0
3-0
1-2
所以,上面的这种事例是严重错误的。因为,如果结果集中如果只有一个对象,那reduce的时候就不会参于里面的函数运算。
先对比下二者的函数签名: class PairRDDFunctions[K, C](...) { def reduceByKey(func: (C, C) => C): RDD[(K, C)] def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] }
总体来看, reduceByKey 比 combineByKey 更简单,combineByKey 比 reduceByKey 更通用,另外,在实际转换(从对象V到对象C)过程当中,对开发员来讲是非常讲究的。而我将就实际的需求事例,来做讲解:
现有 V 与 C 两个对象:
classVobj(vId: String, vName: String, vType : String) extends Serializable{ var id:String = vId val name:String = vName val stype:String = vType }
classCobj extends Serializable{ var lists = new ObjectArraySet[Vobj]() }
现有数据内容如下:
val c01 = new Vobj("1","hu","1") val c02 = new Vobj("2","hu","2") val c03 = new Vobj("3","hu","3") val c04 = new Vobj("4","hu","1") val c05 = new Vobj("5","hu","1")
现在需要按 V对象的值进行分组合并,将转化成 C对象,现有两种实现方式:
第一种:
val testCobjRdd = sc.parallelize(List(c01,c02,c03,c04,c05)) testCobjRdd.map(x => { //在map阶段就进行对象的转换 val cj = new Cobj() cj.lists.add(x) (x.stype,cj) }).reduceByKey((pre:Cobj,aft:Cobj) => { //构造一个新的中间对象并间数据进行汇集 val mid = new Cobj() mid.lists.addAll(pre.lists) mid.lists.addAll(aft.lists) mid }).foreach(x => { println(x._1+"-"+x._2.lists.size()) })
第二种:
testCobjRdd.map(x => { (x.stype,x) }).combineByKey[Cobj]((v:Vobj)=>{ //创建V对象的初始化[在第一个RDD中的第一条C记录中逻辑] val midC = new Cobj() midC.lists.add(v) midC },(c:Cobj,v:Vobj) => { //由C到V汇集业务逻辑[在第一个RDD中的第一条V记录与第二条C记录的逻辑] c.lists.add(v) c },(cPre:Cobj,cAft:Cobj) => { //由V到V汇集业务逻辑[在第一个与第二个RDD中汇集时的逻辑] val midC = new Cobj() midC.lists.addAll(cPre.lists) midC.lists.addAll(cAft.lists) midC }).foreach(x => { println(x._1+"-"+x._2.lists.size()) })
结果都会如下:
2-1
3-1
1-3
其中,这两种实现逻辑是等价的。但是根据实际消耗的时间来看,第二种要好于第一种。
另外,重点讲一个严重错误事例,有的童鞋为了省事,只用V一个对象做合并,然后 V对象内新建一个集合属性,如下:
classVobj(vId: String, vName: String, vType : String) extends Serializable{ var id:String = vId val name:String = vName val stype:String = vType var lists = new ObjectArraySet[Vobj]() }
然后,在代码中如下:
val testCobjRdd = sc.parallelize(List(c01,c02,c03,c04,c05)) testCobjRdd.map(x => { (x.stype,x) }).reduceByKey((pre,aft) => { val cMid = new Vobj(null,null,null) cMid.lists.add(pre) cMid.lists.add(aft) cMid }).foreach(x => { println(x._1+"-"+x._2.lists.size()) })
却发现结果为:
2-0
3-0
1-2
所以,上面的这种事例是严重错误的。因为,如果结果集中如果只有一个对象,那reduce的时候就不会参于里面的函数运算。
相关文章推荐
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark RDD中Transformation的combineByKey、reduceByKey,join详解
- 请教Spark 中 combinebyKey 和 reduceByKey的传入函数参数的区别?
- 结合Spark源码分析, combineByKey, aggregateByKey, foldByKey, reduceByKey
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- sparkrdd自动转换能用pairfun(否则无法用reducebykey,groupbykey)
- Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- [Spark--基础]--聚合操作-reduceByKey、combineBykey、groupBy和AggregateByKey
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark代码3之Action:reduce,reduceByKey,sorted,lookup,take,saveAsTextFile
- Spark—聚合操作—combineByKey
- Spark函数讲解:combineByKey
- spark新能优化之reduceBykey和groupBykey的使用
- spark之combineByKey