spark 的transformations之map,flatMap,mapPartitions,mapPartitionsWithIndex的用法
2016-09-23 11:19
387 查看
版本spark1.6.1
spark的编程思想跟mapreduce有很大的相似之处,这几个函数都可以看做是类似在map端的操作处理。
一个RDD(分布式弹性数据集),包含n个partition,你可以将每个partition看做是类似map的操作。
Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. 这几个transformations的主要区别:
map和flatMap的func接受的都是一个item参数,map返回也是一个item,flatMap返回是一个Seq
mapPartitions的func函数接收的是一个迭代器,返回也是一个迭代器
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
object Test2 {
def main(args:Array[String]){
val sparkconf = new SparkConf().setAppName("test2").setMaster("local")
val sc = new SparkContext(sparkconf)
val list = List(1,2,3,4,5,6,7,8,9,0)
val rdd = sc.parallelize(list, 3);
// map : transformations
// map(func) Return a new distributed dataset formed by passing each element of
// the source through a function func.
//map的作用是将 T =》U
val rddmap = rdd.map(_+10)
rddmap.foreach ( println )
// flatMap : Transformations
// flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items
//(so func should return a Seq rather than a single item).
//flatMap T=>Seq(u)
//flatMap与map的区别在于,flatMap对于每一条item,会输出一个Seq,而map只是一个另一个item
val rddflatmap = rdd.flatMap ( x => for(y<-1 to x) yield y )
println("count="+rddflatmap.count())
rddflatmap.foreach ( x=>println("rddflatmap="+x) )
//mapPartitions : Transformations
//mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD,
//so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
//mapPartitions的func接收的参数是一个迭代器(iter),然后输出的是另一个迭代器(iter)。作用于该RDD中的每一个partition
val rddmappartitions = rdd.mapPartitions(iter => { val s= new ArrayBuffer[Int](); while(iter.hasNext) {val m=iter.next(); for(i<-(1 to m)) s+=i } ; s.iterator}, false)
rddmappartitions.foreach ( println )
//mapPartitionsWithIndex : Transformations
//mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value
//representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U>
//when running on an RDD of type T.
//mapPartitionsWithIndex相比mapPartitions,只是增加了一个partition id。
val mapPartitionsWithIndex = rdd.mapPartitionsWithIndex((x,y)=>{println("partition id is "+x);if(x==0){for(t<-y) yield t+10};else for(t<-y) yield 0; }, false);
mapPartitionsWithIndex.foreach(println)
}
}
spark的编程思想跟mapreduce有很大的相似之处,这几个函数都可以看做是类似在map端的操作处理。
一个RDD(分布式弹性数据集),包含n个partition,你可以将每个partition看做是类似map的操作。
Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. 这几个transformations的主要区别:
map和flatMap的func接受的都是一个item参数,map返回也是一个item,flatMap返回是一个Seq
mapPartitions的func函数接收的是一个迭代器,返回也是一个迭代器
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
object Test2 {
def main(args:Array[String]){
val sparkconf = new SparkConf().setAppName("test2").setMaster("local")
val sc = new SparkContext(sparkconf)
val list = List(1,2,3,4,5,6,7,8,9,0)
val rdd = sc.parallelize(list, 3);
// map : transformations
// map(func) Return a new distributed dataset formed by passing each element of
// the source through a function func.
//map的作用是将 T =》U
val rddmap = rdd.map(_+10)
rddmap.foreach ( println )
// flatMap : Transformations
// flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items
//(so func should return a Seq rather than a single item).
//flatMap T=>Seq(u)
//flatMap与map的区别在于,flatMap对于每一条item,会输出一个Seq,而map只是一个另一个item
val rddflatmap = rdd.flatMap ( x => for(y<-1 to x) yield y )
println("count="+rddflatmap.count())
rddflatmap.foreach ( x=>println("rddflatmap="+x) )
//mapPartitions : Transformations
//mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD,
//so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
//mapPartitions的func接收的参数是一个迭代器(iter),然后输出的是另一个迭代器(iter)。作用于该RDD中的每一个partition
val rddmappartitions = rdd.mapPartitions(iter => { val s= new ArrayBuffer[Int](); while(iter.hasNext) {val m=iter.next(); for(i<-(1 to m)) s+=i } ; s.iterator}, false)
rddmappartitions.foreach ( println )
//mapPartitionsWithIndex : Transformations
//mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value
//representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U>
//when running on an RDD of type T.
//mapPartitionsWithIndex相比mapPartitions,只是增加了一个partition id。
val mapPartitionsWithIndex = rdd.mapPartitionsWithIndex((x,y)=>{println("partition id is "+x);if(x==0){for(t<-y) yield t+10};else for(t<-y) yield 0; }, false);
mapPartitionsWithIndex.foreach(println)
}
}
相关文章推荐
- spark:map mapPartitions flatmap
- spark map flatMap flatMapToPair mapPartitions 的区别和用途
- spark map flatMap flatMapToPair mapPartitions 的区别和用途
- spark:map mapPartitions flatmap
- spark中flatMap函数用法--spark学习(基础)
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- spark val b = a.flatMap(x => 1 to x)详解
- Spark Transformation —— flatMap算子
- Spark 中 map 与 flatMap 的区别
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- RxJava flatMap操作符用法详解
- Spark 中map与 flatMap的区别
- RxAndroid之Action,Func,map,flatmap的简单用法。
- [spark]map 与 flatMap 的区别
- Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues
- Spark map flatMap
- RxJava(三) flatMap操作符用法详解
- Spark 中 map 与 flatMap 的区别
- Spark 中 map 与 flatMap 的区别
- spark之map与flatMap区别