您的位置:首页 > 移动开发

spark 的transformations之map,flatMap,mapPartitions,mapPartitionsWithIndex的用法

2016-09-23 11:19 387 查看



  Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.     这几个transformations的主要区别:


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer

object Test2 {
def main(args:Array[String]){
val sparkconf = new SparkConf().setAppName("test2").setMaster("local")
val sc = new SparkContext(sparkconf)

val list = List(1,2,3,4,5,6,7,8,9,0)
val rdd = sc.parallelize(list, 3);

// map : transformations
// map(func) Return a new distributed dataset formed by passing each element of
// the source through a function func.
//map的作用是将 T =》U
val rddmap = rdd.map(_+10)
rddmap.foreach ( println )

// flatMap : Transformations
// flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items
//(so func should return a Seq rather than a single item).
//flatMap T=>Seq(u)
val rddflatmap = rdd.flatMap ( x => for(y<-1 to x) yield y )
rddflatmap.foreach ( x=>println("rddflatmap="+x) )

//mapPartitions : Transformations
//mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD,
//so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
val rddmappartitions = rdd.mapPartitions(iter => { val s= new ArrayBuffer[Int](); while(iter.hasNext) {val m=iter.next(); for(i<-(1 to m)) s+=i } ; s.iterator}, false)
rddmappartitions.foreach ( println )
//mapPartitionsWithIndex : Transformations
//mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value
//representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U>
//when running on an RDD of type T.
//mapPartitionsWithIndex相比mapPartitions,只是增加了一个partition id。
val mapPartitionsWithIndex = rdd.mapPartitionsWithIndex((x,y)=>{println("partition id is "+x);if(x==0){for(t<-y) yield t+10};else for(t<-y) yield 0; }, false);

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息