Spark算子:统计RDD分区中的元素及数量
2017-04-08 19:24
441 查看
原文地址:
http://www.cnblogs.com/seaspring/articles/5641895.html
//创建一个RDD,默认分区15个,因为我的spark-shell指定了一共使用15个CPU资源
//–total-executor-cores 15
scala> var rdd1 = sc.makeRDD(1 to 50)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at :21
scala> rdd1.partitions.size
res15: Int = 15
//统计rdd1每个分区中元素数量
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.Map[String,Int]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
if(part_map.contains(part_name)) {
var ele_cnt = part_map(part_name)
part_map(part_name) = ele_cnt + 1
} else {
part_map(part_name) = 1
}
iter.next()
}
part_map.iterator
}
}.collect
res16: Array[(String, Int)] = Array((part_0,3), (part_1,3), (part_2,4), (part_3,3), (part_4,3), (part_5,4), (part_6,3),
(part_7,3), (part_8,4), (part_9,3), (part_10,3), (part_11,4), (part_12,3), (part_13,3), (part_14,4))
//从part_0到part_14,每个分区中的元素数量
//统计rdd1每个分区中有哪些元素
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.Map[String,List[Int]]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
var elem = iter.next()
if(part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[Int]{elem}
}
}
part_map.iterator
}
}.collect
res17: Array[(String, List[Int])] = Array((part_0,List(3, 2, 1)), (part_1,List(6, 5, 4)), (part_2,List(10, 9, 8, 7)), (part_3,List(13, 12, 11)),
(part_4,List(16, 15, 14)), (part_5,List(20, 19, 18, 17)), (part_6,List(23, 22, 21)), (part_7,List(26, 25, 24)), (part_8,List(30, 29, 28, 27)),
(part_9,List(33, 32, 31)), (part_10,List(36, 35, 34)), (part_11,List(40, 39, 38, 37)), (part_12,List(43, 42, 41)), (part_13,List(46, 45, 44)),
(part_14,List(50, 49, 48, 47)))
//从part_0到part14,每个分区中包含的元素
http://www.cnblogs.com/seaspring/articles/5641895.html
//创建一个RDD,默认分区15个,因为我的spark-shell指定了一共使用15个CPU资源
//–total-executor-cores 15
scala> var rdd1 = sc.makeRDD(1 to 50)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at :21
scala> rdd1.partitions.size
res15: Int = 15
//统计rdd1每个分区中元素数量
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.Map[String,Int]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
if(part_map.contains(part_name)) {
var ele_cnt = part_map(part_name)
part_map(part_name) = ele_cnt + 1
} else {
part_map(part_name) = 1
}
iter.next()
}
part_map.iterator
}
}.collect
res16: Array[(String, Int)] = Array((part_0,3), (part_1,3), (part_2,4), (part_3,3), (part_4,3), (part_5,4), (part_6,3),
(part_7,3), (part_8,4), (part_9,3), (part_10,3), (part_11,4), (part_12,3), (part_13,3), (part_14,4))
//从part_0到part_14,每个分区中的元素数量
//统计rdd1每个分区中有哪些元素
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.Map[String,List[Int]]()
while(iter.hasNext){
var part_name = "part_" + partIdx;
var elem = iter.next()
if(part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[Int]{elem}
}
}
part_map.iterator
}
}.collect
res17: Array[(String, List[Int])] = Array((part_0,List(3, 2, 1)), (part_1,List(6, 5, 4)), (part_2,List(10, 9, 8, 7)), (part_3,List(13, 12, 11)),
(part_4,List(16, 15, 14)), (part_5,List(20, 19, 18, 17)), (part_6,List(23, 22, 21)), (part_7,List(26, 25, 24)), (part_8,List(30, 29, 28, 27)),
(part_9,List(33, 32, 31)), (part_10,List(36, 35, 34)), (part_11,List(40, 39, 38, 37)), (part_12,List(43, 42, 41)), (part_13,List(46, 45, 44)),
(part_14,List(50, 49, 48, 47)))
//从part_0到part14,每个分区中包含的元素
相关文章推荐
- Spark算子:统计RDD分区中的元素及数量
- Spark算子:统计RDD分区中的元素及数量
- Spark运算:统计RDD分区中的元素及数量
- 统计RDD分区中的元素及数量
- Spark RDD 默认分区数量 - repartitions和coalesce异同
- Spark---算子调优之filter过后使用coalesce减少分区数量
- 【Spark】RDD操作详解2——值型Transformation算子
- 在Spark集群中,集群的节点个数、RDD分区个数、cpu内核个数三者与并行度的关系??
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- Spark算子:RDD基本转换操作(7)–zipWithIndex、zipWithUniqueId
- Spark算子:RDD行动Action操作(4)–countByKey、foreach、foreachPartition、sortBy
- spark累加器介绍-案例通过累加器统计文章中空白行数量
- Hive压缩和spark算子,RDD总结
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark算子--RDD的基本转换
- spark RDD算子(十二)之RDD 分区操作上mapPartitions, mapPartitionsWithIndex
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- Spark算子:RDD基本转换操作(5)–mapPartitions、mapPartitionsWithIndex
- 一起学spark(6)-- 仅在数值RDD上的统计操作