您的位置:首页 > 产品设计 > UI/UE

Spark编程之基本的RDD算子之zip,zipPartitions,zipWithIndex,zipWithUniqueId

2017-07-22 23:24 513 查看

Spark编程之基本的RDD算子之zip,zipPartitions,zipWithIndex,zipWithUniqueId

1) zip拉链操作

首先来看一下基本的api。

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]


自身的RDD的值的类型为T类型,另一个RDD的值的类型为U类型。zip操作将这两个值连接在一起。构成一个元祖值。RDD的值的类型为元祖。

都是第i个值和第i个值进行连接。

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect
//可以看到每个值都是对应的。
res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104),
(5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112),
(13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119),
(20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126),
(27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133)...


val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
val c = sc.parallelize(201 to 300, 3)
//同样也可以多次进行zip操作,则返回的元祖值包含有多个值。
a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect
res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202),
(3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207),
(8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212),
(13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217),
(18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222)...


2) zipPartitions

首先来看看api.

zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

preservesPartitioning表示的是否保留父RDD的partitioner分区信息。

def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]


scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21

//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{
|         (x,iter) => {
|           var result = List[String]()
|             while(iter.hasNext){
|               result ::= ("part_" + x + "|" + iter.next())
|             }
|             result.iterator
|
|         }
|       }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)

//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{
|         (x,iter) => {
|           var result = List[String]()
|             while(iter.hasNext){
|               result ::= ("part_" + x + "|" + iter.next())
|             }
|             result.iterator
|
|         }
|       }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)

//rdd1和rdd2做zipPartition
//可以看到,两个rdd,相同的分区里面会进行按照顺序进行合并。
scala> rdd1.zipPartitions(rdd2){
|       (rdd1Iter,rdd2Iter) => {
|         var result = List[String]()
|         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
|         }
|         result.iterator
|       }
|     }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)


scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21

//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{
|         (x,iter) => {
|           var result = List[String]()
|             while(iter.hasNext){
|               result ::= ("part_" + x + "|" + iter.next())
|             }
|             result.iterator
|
|         }
|       }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)

//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{
|         (x,iter) => {
|           var result = List[String]()
|             while(iter.hasNext){
|               result ::= ("part_" + x + "|" + iter.next())
|             }
|             result.iterator
|
|         }
|       }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)

//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){
|       (rdd1Iter,rdd2Iter) => {
|         var result = List[String]()
|         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
|         }
|         result.iterator
|       }
|     }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)


例子来源于 RDD_zipPartitions

3) zipWithIndex

def zipWithIndex(): RDD[(T, Long)]


表示将rdd的元素和它的索引的值进行拉练操作。索引开始于0.组合成为键值对。

val z = sc.parallelize(Array("A", "B", "C", "D"))
val r = z.zipWithIndex
res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))

val z = sc.parallelize(100 to 120, 5)
val r = z.zipWithIndex
r.collect
res11: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20))


3) zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]


这个表示的是给每一个元素一个新的id值,这个id值不一定和真实的元素的索引值一致。返回的同样是一个元祖

这个唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

val z = sc.parallelize(100 to 120, 5)
val r = z.zipWithUniqueId
r.collect
res12: Array[(Int, Long)] = Array((100,0), (101,5), (102,10), (103,15), (104,1), (105,6), (106,11), (107,16), (108,2), (109,7), (110,12), (111,17), (112,3), (113,8), (114,13), (115,18), (116,4), (117,9), (118,14), (119,19), (120,24))


scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
//rdd1有两个分区,
scala> rdd1.zipWithUniqueId().collect
res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//总分区数为2
//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5


参考链接:http://lxw1234.com/archives/2015/07/352.htm
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark rdd 算子 zip 编程