Spark算子:RDD行动Action操作(4)–countByKey、foreach
2017-01-19 10:47
435 查看
转载:http://lxw1234.com/archives/2015/07/399.htm
遇到一个在spark shell上执行foreach什么都不显示的问题。
答案在下方:
def countByKey(): Map[K, Long]
countByKey用于统计RDD[K,V]中每个K的数量。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
def foreach(f: (T) ⇒ Unit): Unit
foreach用于遍历RDD,将函数f应用于每一个元素。
但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
我在Spark1.4中是这样,不知道是否真如此。
这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。
scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
scala> rdd1.foreach(x => cnt += x)
scala> cnt.value
res51: Int = 55
scala> rdd1.collect.foreach(println)
1
2
3
4
5
6
7
8
9
10
def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
foreachPartition和foreach类似,只不过是对每一个分区使用f。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at :21
scala> rdd1.foreachPartition { x => {
| allsize += x.size
| }}
scala> println(allsize.value)
10
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
sortBy根据给定的排序k函数将RDD中的元素进行排序。
scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
scala> rdd1.sortBy(x => x).collect
res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序
scala> rdd1.sortBy(x => x,false).collect
res2: Array[Int] = Array(7, 6, 3, 2, 1, 0) //降序
//RDD[K,V]类型
scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
scala> rdd1.sortBy(x => x).collect
res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))
//按照V进行降序排序
scala> rdd1.sortBy(x => x._2,false).collect
res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
更多关于Spark算子的介绍,可参考 Spark算子系列文章 :
http://lxw1234.com/archives/2015/07/363.htm
遇到一个在spark shell上执行foreach什么都不显示的问题。
答案在下方:
countByKey
def countByKey(): Map[K, Long]countByKey用于统计RDD[K,V]中每个K的数量。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
foreach
def foreach(f: (T) ⇒ Unit): Unitforeach用于遍历RDD,将函数f应用于每一个元素。
但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
我在Spark1.4中是这样,不知道是否真如此。
这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。
scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
scala> rdd1.foreach(x => cnt += x)
scala> cnt.value
res51: Int = 55
scala> rdd1.collect.foreach(println)
1
2
3
4
5
6
7
8
9
10
foreachPartition
def foreachPartition(f: (Iterator[T]) ⇒ Unit): UnitforeachPartition和foreach类似,只不过是对每一个分区使用f。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at :21
scala> rdd1.foreachPartition { x => {
| allsize += x.size
| }}
scala> println(allsize.value)
10
sortBy
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]sortBy根据给定的排序k函数将RDD中的元素进行排序。
scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
scala> rdd1.sortBy(x => x).collect
res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序
scala> rdd1.sortBy(x => x,false).collect
res2: Array[Int] = Array(7, 6, 3, 2, 1, 0) //降序
//RDD[K,V]类型
scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
scala> rdd1.sortBy(x => x).collect
res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))
//按照V进行降序排序
scala> rdd1.sortBy(x => x._2,false).collect
res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
更多关于Spark算子的介绍,可参考 Spark算子系列文章 :
http://lxw1234.com/archives/2015/07/363.htm
相关文章推荐
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操 4000 作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(2)–take、top、takeOrdered
- Spark算子:RDD行动Action操作(5)–saveAsTextFile、saveAsSequenceFile、saveAsObjectFile
- Spark算子:RDD行动Action操作(2)–take、top、takeOrdered
- Spark算子:RDD行动Action操作(1)–first、count、reduce、collect
- 3.4 Spark RDD Action操作4-countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(1)–first、count、reduce、collect
- Spark算子:RDD行动Action操作(5)–saveAsTextFile、saveAsSequenceFile、saveAsObjectFile
- Spark算子:RDD行动Action操作(6)–saveAsHadoopFile、saveAsHadoopDataset
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD行动Action操作(1)–first、count、reduce、collect
- Spark算子:RDD行动Action操作(2)–take、top、takeOrdered
- Spark算子:RDD行动Action操作(3)–aggregate、fold、lookup
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark算子:RDD行动Action操作(2)–take、top、takeOrdered