Spark编程之基本的RDD算子之cogroup,groupBy,groupByKey
2017-07-24 14:50
633 查看
Spark编程之基本的RDD算子之cogroup,groupBy,groupByKey
1) cogroup [Pair], groupWith [Pair]首先来看一下它的api。
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], IterableW1], Iterable[W2]))]
这个表示将多个RDD中的同一个key对应的不同的value组合到一起。它可以允许同时group三个key-value类型的键值对的数据。
val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.map((_, "b")) //这个将原来的数据变为一个键值对类型。 val c = a.map((_, "c")) b.cogroup(c).collect res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c))), (3,(ArrayBuffer(b),ArrayBuffer(c))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c))) // 可以看到它首先是将同一个rdd里面的有相同的key的值放在一个ArrayBuffer里面。最后再和同一个key相关。
val d = a.map((_, "d")) b.cogroup(c, d).collect //它同样允许同时操作多个rdd值,操作时同样和之前类似 res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d))) )
val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2) val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2) //在这个里面x有的键y是可能没有的。 x.cogroup(y).collect //可以从结果里看到,x有key为2,而y没有,则cogroup之后,y那边的ArrayBuffer是空。 res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), (2,(ArrayBuffer(banana),ArrayBuffer())), (3,(ArrayBuffer(orange),ArrayBuffer())), (1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))), (5,(ArrayBuffer(),ArrayBuffer(computer))))
2) groupBy
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]
groupBy算子接收一个函数,这个函数返回的值作为key,然后通过这个key来对里面的元素进行分组。
val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect //返回的even或者odd字符串作为key来group RDD里面的值, res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))
val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc).collect //同样的,返回的是0的时候,表示的是偶数值,返回的是1的时候表示的是奇数。 res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))
class MyPartitioner extends Partitioner { def numPartitions: Int = 2 def getPartition(key: Any): Int = { key match { case null => 0 case key: Int => key % numPartitions case _ => key.hashCode % numPartitions } } override def equals(other: Any): Boolean = { other match { case h: MyPartitioner => true case _ => false } } } val a = sc.parallelize(1 to 9, 3) val p = new MyPartitioner() val b = a.groupBy((x:Int) => { x }, p) val c = b.mapWith(i => i)((a, b) => (b, a)) c.collect res42: Array[(Int, (Int, Seq[Int]))] = Array((0,(4,ArrayBuffer(4))), (0,(2,ArrayBuffer(2))), (0,(6,ArrayBuffer(6))), (0,(8,ArrayBuffer(8))), (1,(9,ArrayBuffer(9))), (1,(3,ArrayBuffer(3))), (1,(1,ArrayBuffer(1))), (1,(7,ArrayBuffer(7))), (1,(5,ArrayBuffer(5))))
例子来源:groupBy
3) groupByKey [Pair]
def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
这个算子和group类似,不过和它不同的是他不接收一个函数,而是直接将键值对类型的数据的key作为group的key 值。同样的,他也可以接收其他参数比如说partitioner。
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) //将字符串的长度作为key值。 b.groupByKey.collect //根据相同key值来进行group操作 res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))
相关文章推荐
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark算子:RDD键值转换操作(3)–groupBy、keyBy、groupByKey、reduceByKey、reduceByKeyLocally
- Spark编程的基本的算子之:combineByKey,reduceByKey,groupByKey
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union
- Spark编程之基本的RDD算子-aggregate和aggregateByKey
- Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)
- Spark编程之基本的RDD算子count, countApproxDistinct, countByValue等
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look
- Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、lookup(一)
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子[12]:groupByKey、cogroup、join、lookup 源码实例详解
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark编程之基本的RDD算子之zip,zipPartitions,zipWithIndex,zipWithUniqueId
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark groupByKey,reduceByKey,sortByKey算子的区别