您的位置:首页 > 编程语言

Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union

2017-07-27 19:09 851 查看

Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union等。

1) glom

这个算子会将每一个分区的元素放在一个数组里面然后存储在RDD里面。

def glom(): RDD[Array[T]]


val a = sc.parallelize(1 to 100, 3)
a.glom.collect //可以看到,glom将每个分区都放在了一个数组里面。调用collect方法后,会继续将返回的结果放在一个数组里面。

res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))


2) substract

实现一个类似于集合A-B的操作。

def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner): RDD[T]


接收一个RDD[T],或者再传入分区数,或者自己定义的Partitioner。

val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(6, 9, 4, 7, 5, 8) //将剩下的元祖返回。


3) subtractByKey

根据key值来进行substract。

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]


val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("ant", "falcon", "squid"), 2)
val d = c.keyBy(_.length)
b.subtractByKey(d).collect //d中没有key值为4的键值对,所以返回的结果为(4,lion)
res15: Array[(Int, String)] = Array((4,lion))


4) intersection

返回两个RDD间元素的交集。

def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]


val x = sc.parallelize(1 to 20)
val y = sc.parallelize(10 to 30)
val z = x.intersection(y)

z.collect //可以看到两个RDD间元素的交集为10-20
res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11)


5) union, ++

将两个RDD的元素连接起来。

def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]


val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)


6) distinct

返回一个全新的RDD,每个元素只包含一次。

def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]


val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)  //去除了里面重复的元素。

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length   //去除元素以后重新分区。
res16: Int = 2

a.distinct(3).partitions.length
res17: Int = 3
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐