Spark编程之基本的RDD算子之map,mapPartitions, mapPartitionsWithIndex.
2017-07-22 09:19
561 查看
Spark编程之基本的RDD算子之map,mapPartitions, mapPartitionsWithIndex.
1) map首先来看一下基本的api:
def map[U: ClassTag](f: T => U): RDD[U]
这个是spark算子中非常经典的一个api,接收一个函数,将T类型的值变为U类型的值。返回值放在一个RDD里面。是一个transformation的操作。返回一个新的RDD。
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) //b会生成一个新的rdd,类型为RDD[Int]。值为a的每个元素的长度。 val c = a.zip(b) //这个是一个拉链操作,会将和b相同位置的元素给连接在一起。 c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
2) mapPartitions:
首先来看一下基本的api:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
这个是一个比较特别的map函数,这个算子在每个分区只被调用一次。它传入的参数是一个迭代器,函数将迭代器的T类型的值变换为U类型的值。
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效。
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() //自己顶一个这个迭代器,这个迭代器将原来为T类型的RDD变换为值为(T,T)类型的RDD。 var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) //每次将前一个值和后一个值放在一个元祖里面,然后放到list里面去,最后返回一个list的迭代器 pre = cur; } res.iterator } a.mapPartitions(myfunc).collect //由于有3个分区,可以看到,(1,2) (2,3)在一起,(4,5) (5,6)在一起。 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
var rdd1 = sc.makeRDD(1 to 5,2) //rdd1有两个分区 scala> var rdd3 = rdd1.mapPartitions{ x => { | var result = List[Int]() | var i = 0 | while(x.hasNext){ | i += x.next() | } | result.::(i).iterator | }} rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23 //rdd3将rdd1中每个分区中的数值累加 scala> rdd3.collect res65: Array[Int] = Array(3, 12) scala> rdd3.partitions.size res66: Int = 2
3) mapPartitionsWithIndex
mapPartitionsWithIndex和mapPartitions类似,不过这个算子接收2个参数。第一个是一个迭代器,用于迭代处理分区的元素。
第二个是一个index,表示的是分区的索引值。输出值是一个迭代器,包含的是变换过后的新的rdd。
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator //这个函数返回的是一个元素的索引值和本身的值。 } x.mapPartitionsWithIndex(myfunc).collect() res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
var rdd1 = sc.makeRDD(1 to 5,2) //rdd1有两个分区 var rdd2 = rdd1.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() var i = 0 while(iter.hasNext){ i += iter.next() } result.::(x + "|" + i).iterator //第x号分区和分区的元素的值的求和。 } } //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引 scala> rdd2.collect res13: Array[String] = Array(0|3, 1|12)
相关文章推荐
- Spark算子:RDD基本转换操作(5)–mapPartitions/mapPartitionsWithIndex
- Spark编程之基本的RDD算子-aggregate和aggregateByKey
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark编程之基本的RDD算子之glom,substract,substractByKey,intersection,distinct,union
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark编程之基本的RDD算子之fold,foldByKey,treeAggregate, treeReduce
- Spark编程之基本的RDD算子之zip,zipPartitions,zipWithIndex,zipWithUniqueId
- Spark算子:RDD基本转换操作(5)–mapPartitions、
- Spark编程之基本的RDD算子coalesce, repartition, checkpoint
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子:RDD基本转换操作(mapPartitions、mapPartitionsWithIndex)
- Spark编程之基本的RDD算子之join,rightOuterJoin, leftOuterJoin
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark编程之基本的RDD算子count, countApproxDistinct, countByValue等
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex