Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
2016-12-19 00:00
211 查看
摘要: 关键字:Spark算子、Spark RDD键值转换、partitionBy、mapValues、flatMapValues
该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
16/12/19 16:01:52 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:40, took 0.592829 s
(part_0,List((2,B), (1,A)))
(part_1,List((4,D), (3,C)))
16/12/19 16:01:52 INFO SparkContext: Starting job: collect at ShellTest.scala:43
...........
16/12/19 16:01:52 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/12/19 16:01:52 INFO DAGScheduler: ResultStage 2 (collect at ShellTest.scala:43) finished in 0.144 s
16/12/19 16:01:52 INFO DAGScheduler: Job 1 finished: collect at ShellTest.scala:43, took 0.630514 s
(part_0,List((4,D)))
(part_1,List((1,A)))
(part_2,List((2,B)))
(part_3,List((3,C)))
同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。
16/12/19 16:05:46 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 42 ms on localhost (2/2)
(1,A_)
(2,B_)
(3,C_)
(4,D_)
16/12/19 16:05:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。
16/12/19 16:08:25 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 0.696869 s
(1,A)
(1,*)
(2,B)
(2,*)
(3,C)
(3,*)
(4,D)
(4,*)
16/12/19 16:08:25 INFO SparkContext: Invoking stop() from shutdown hook
partitionBy
def partitionBy(partitioner: Partitioner): RDD[(K, V)]该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
def main(args: Array[String]) { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) val func = (partIdx : Int,iter : Iterator[(Int,String)]) => { var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() while(iter.hasNext){ var part_name = "part_" + partIdx; var elem = iter.next() if(part_map.contains(part_name)) { var elems = part_map(part_name) elems ::= elem part_map(part_name) = elems } else { part_map(part_name) = List[(Int,String)]{elem} } } part_map.iterator } rdd1.mapPartitionsWithIndex{func}.collect.foreach(println(_)) //使用partitionBy重分区 var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(4)) rdd2.mapPartitionsWithIndex{func}.collect.foreach(println(_)) }
16/12/19 16:01:52 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:40, took 0.592829 s
(part_0,List((2,B), (1,A)))
(part_1,List((4,D), (3,C)))
16/12/19 16:01:52 INFO SparkContext: Starting job: collect at ShellTest.scala:43
...........
16/12/19 16:01:52 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/12/19 16:01:52 INFO DAGScheduler: ResultStage 2 (collect at ShellTest.scala:43) finished in 0.144 s
16/12/19 16:01:52 INFO DAGScheduler: Job 1 finished: collect at ShellTest.scala:43, took 0.630514 s
(part_0,List((4,D)))
(part_1,List((1,A)))
(part_2,List((2,B)))
(part_3,List((3,C)))
mapValues
def mapValues[U](f: (V) => U): RDD[(K, U)]同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) rdd1.mapValues(x => x + "_").collect.foreach(println(_)) }
16/12/19 16:05:46 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 42 ms on localhost (2/2)
(1,A_)
(2,B_)
(3,C_)
(4,D_)
16/12/19 16:05:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
flatMapValues
def flatMapValues[U](f: (V) => TraversableOnce[U]): RDD[(K, U)]同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) rdd1.flatMapValues(x => x + "*").collect.foreach(println(_)) }
16/12/19 16:08:25 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:24, took 0.696869 s
(1,A)
(1,*)
(2,B)
(2,*)
(3,C)
(3,*)
(4,D)
(4,*)
16/12/19 16:08:25 INFO SparkContext: Invoking stop() from shutdown hook
相关文章推荐
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- RDD键值转换操作(1)–partitionBy、mapValues、flatMapValues
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD键值转换操作(4)–cogroup/join
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD基本转换操作map、flatMap
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark算子:RDD基本转换操作(1)–map、flagMap、distinct
- Spark算子:RDD键值转换操作(4)–cogroup、join
- Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey