您的位置:首页 > 产品设计 > UI/UE

Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues

2016-12-19 00:00 211 查看
摘要: 关键字:Spark算子、Spark RDD键值转换、partitionBy、mapValues、flatMapValues

partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

def main(args: Array[String]) {
//默认分区12个
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))

var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)

val func = (partIdx : Int,iter : Iterator[(Int,String)]) => {
var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
var elem = iter.next()
if(part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[(Int,String)]{elem}
}
}
part_map.iterator
}
rdd1.mapPartitionsWithIndex{func}.collect.foreach(println(_))

//使用partitionBy重分区
var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(4))
rdd2.mapPartitionsWithIndex{func}.collect.foreach(println(_))
}

16/12/19 16:01:52 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:40, took 0.592829 s
(part_0,List((2,B), (1,A)))
(part_1,List((4,D), (3,C)))
16/12/19 16:01:52 INFO SparkContext: Starting job: collect at ShellTest.scala:43
...........
16/12/19 16:01:52 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/12/19 16:01:52 INFO DAGScheduler: ResultStage 2 (collect at ShellTest.scala:43) finished in 0.144 s
16/12/19 16:01:52 INFO DAGScheduler: Job 1 finished: collect at ShellTest.scala:43, took 0.630514 s
(part_0,List((4,D)))
(part_1,List((1,A)))
(part_2,List((2,B)))
(part_3,List((3,C)))

mapValues

def mapValues[U](f: (V) => U): RDD[(K, U)]

同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。

def main(args: Array[String]): Unit = {
//默认分区12个
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))

var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1.mapValues(x => x + "_").collect.foreach(println(_))
}

16/12/19 16:05:46 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 42 ms on localhost (2/2)
(1,A_)
(2,B_)
(3,C_)
(4,D_)
16/12/19 16:05:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

flatMapValues

def flatMapValues[U](f: (V) => TraversableOnce[U]): RDD[(K, U)]

同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。

def main(args: Array[String]): Unit = {
//默认分区12个
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))

var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1.flatMapValues(x => x + "*").collect.foreach(println(_))
}

16/12/19 16:08:25 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 0.696869 s
(1,A)
(1,*)
(2,B)
(2,*)
(3,C)
(3,*)
(4,D)
(4,*)
16/12/19 16:08:25 INFO SparkContext: Invoking stop() from shutdown hook
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spark算子