spark API 之 combineByKey
2016-04-28 12:16
309 查看
以下代码是combineByKey的一个例子,把执行过程展示出来。
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * Created by luckuan on 16/4/19. */ object TT { def main(args: Array[String]) { val sparkConf = new SparkConf() sparkConf.setMaster("local[*]") // sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.setAppName("ts") val sc: SparkContext = new SparkContext(sparkConf) val partitionNum = 2 val rdd = sc.makeRDD(Seq(("a1", "11"), ("a1", "1"), ("a1", "111"), ("a2", "2"), ("a2", "22"), ("a3", "3"), ("a4", "4"), ("a1", "1111"), ("a2", "222")), partitionNum) val zero = () => scala.collection.mutable.HashSet.empty[String] val seq = (s: scala.collection.mutable.HashSet[String], v: String) => { val s_clone = s.clone() //此处clone是为了记录原有的值,否则在下次打印的时候是最终结果,不太直观,线上不需要用到clone. val ret = s += v println(s"seq-s:${s_clone}--seq-v:${v}---seq-rs:${ret}") ret } val comb = (u: scala.collection.mutable.HashSet[String], v: scala.collection.mutable.HashSet[String]) => { { val u_clone = u.clone() val ret = u ++ v println(s"comb-s:${u_clone}--comb-v:${v}---comb-rs:${ret}") ret } } val ret = rdd.combineByKey((v: String) => { println(s"根据[${v}]进行初始化") seq(zero(), v) }, seq, comb, new HashPartitioner(partitionNum)).collect() ret.foreach(println) } }
RDD分区为1
根据[11]进行初始化 seq-s:Set()--seq-v:11---seq-rs:Set(11) seq-s:Set(11)--seq-v:1---seq-rs:Set(1, 11) seq-s:Set(1, 11)--seq-v:111---seq-rs:Set(1, 111, 11) 根据[2]进行初始化 seq-s:Set()--seq-v:2---seq-rs:Set(2) seq-s:Set(2)--seq-v:22---seq-rs:Set(2, 22) 根据[3]进行初始化 seq-s:Set()--seq-v:3---seq-rs:Set(3) 根据[4]进行初始化 seq-s:Set()--seq-v:4---seq-rs:Set(4) seq-s:Set(1, 111, 11)--seq-v:1111---seq-rs:Set(1, 1111, 111, 11) seq-s:Set(2, 22)--seq-v:222---seq-rs:Set(222, 2, 22) (a3,Set(3)) (a4,Set(4)) (a1,Set(1, 1111, 111, 11)) (a2,Set(222, 2, 22)) 分区是1的情况 首先判断当前key是否存在,如果存在,那么执行seq代码,将新值追加到已经存在的set中。如果不存在 调用zero的代码生成一个新的set 这里没有执行comb方法,因为我们只有一个分区。
RDD分区为2
根据[11]进行初始化 根据[22]进行初始化 seq-s:Set()--seq-v:11---seq-rs:Set(11) seq-s:Set()--seq-v:22---seq-rs:Set(22) 根据[3]进行初始化 seq-s:Set(11)--seq-v:1---seq-rs:Set(1, 11) seq-s:Set()--seq-v:3---seq-rs:Set(3) seq-s:Set(1, 11)--seq-v:111---seq-rs:Set(1, 111, 11) 根据[4]进行初始化 seq-s:Set()--seq-v:4---seq-rs:Set(4) 根据[2]进行初始化 seq-s:Set()--seq-v:2---seq-rs:Set(2) 根据[1111]进行初始化 seq-s:Set()--seq-v:1111---seq-rs:Set(1111) seq-s:Set(22)--seq-v:222---seq-rs:Set(222, 22) comb-s:Set(2)--comb-v:Set(222, 22)---comb-rs:Set(222, 2, 22) comb-s:Set(1, 111, 11)--comb-v:Set(1111)---comb-rs:Set(1, 1111, 111, 11) (a3,Set(3)) (a1,Set(1, 1111, 111, 11)) (a4,Set(4)) (a2,Set(222, 2, 22)) “2”和“22”被初始化了两次,说明“a2"被初始化了2次,因为“a2"的数据划分到2个分区中,导致被初始化两次。
相关文章推荐
- java学习第一步 sublime环境配置
- Android:EditText插入图片实现图文混排
- 闭包
- 【vim】mac配置vim,molokai配色
- 【BZOJ2773】ispiti【CDQ分治】【线段树】
- office转pdf(在线预览)思路(java示例)
- JS常用的兼容方法
- maven
- 虚幻4 字符串char utf8转换
- hdu1418 欧拉公式
- JavaScript:内存泄露、性能调优
- win10 安装vagrant + VirtualBox并安装系统(linux/centos/ubuntu)
- HDU 2005 第几天?(水题)
- bitmap与2bitmap实现
- preference-->java-->code style-->code templates
- 343. Integer Break
- 26-Palindrome Number
- android 自学日记(五) ——ListView
- 我的第四次上机作业
- this初步深入学习