您的位置:首页 > 移动开发

Spark编程之基本的RDD算子之map,mapPartitions, mapPartitionsWithIndex.

2017-07-22 09:19 561 查看

Spark编程之基本的RDD算子之map,mapPartitions, mapPartitionsWithIndex.

1) map

首先来看一下基本的api:

def map[U: ClassTag](f: T => U): RDD[U]


这个是spark算子中非常经典的一个api,接收一个函数,将T类型的值变为U类型的值。返回值放在一个RDD里面。是一个transformation的操作。返回一个新的RDD。

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length) //b会生成一个新的rdd,类型为RDD[Int]。值为a的每个元素的长度。
val c = a.zip(b) //这个是一个拉链操作,会将和b相同位置的元素给连接在一起。
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))


2) mapPartitions:

首先来看一下基本的api:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]


这个是一个比较特别的map函数,这个算子在每个分区只被调用一次。它传入的参数是一个迭代器,函数将迭代器的T类型的值变换为U类型的值。

如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效。

val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]() //自己顶一个这个迭代器,这个迭代器将原来为T类型的RDD变换为值为(T,T)类型的RDD。
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur) //每次将前一个值和后一个值放在一个元祖里面,然后放到list里面去,最后返回一个list的迭代器
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect  //由于有3个分区,可以看到,(1,2) (2,3)在一起,(4,5) (5,6)在一起。
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))


var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
|     var i = 0
|     while(x.hasNext){
|       i += x.next()
|     }
|     result.::(i).iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23

//rdd3将rdd1中每个分区中的数值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2


3) mapPartitionsWithIndex

mapPartitionsWithIndex和mapPartitions类似,不过这个算子接收2个参数。第一个是一个迭代器,用于迭代处理分区的元素。

第二个是一个index,表示的是分区的索引值。输出值是一个迭代器,包含的是变换过后的新的rdd。

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]


val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
iter.toList.map(x => index + "," + x).iterator //这个函数返回的是一个元素的索引值和本身的值。
}
x.mapPartitionsWithIndex(myfunc).collect()
res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)


var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + "|" + i).iterator //第x号分区和分区的元素的值的求和。

}
}
//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark rdd api map