spark源码剖析之----Partitioner
2015-09-03 16:57
330 查看
每个RDD里面都有一个可选的分区策略方法,在Spark源码中关于Partitioner提供了Partitioner的抽象类,
可见对Seq(rdd) ++ others中任何一个RDD遍历,若是有自定义的partitioner,则返回该partitioner,否则defaultPartitioner会生成HashPartitioner的实例,即默认采用的是HashPartitioner。HashPartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取RDD中partition数目最大的值。
需要注意的是HashPartitioner不支持类似RDD[Array[_]] or RDD[(Array[_], _)]的RDD。
HashPartitioner实现了抽象类Partitioner,对key进行hash处理,getPartition(key)方法返回的是partitionID,范围是0~numPartitions-1
RangePartitioner处理的KV RDD要求Key是可排序的,即满足Scala的Ordered[K]类型。所以它的构造如下:
计算一个rangBounds(上界),让分区数目为:numPartitions= rangeBounds.length + 1,rangBounds的计算如下:
/** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. */ abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int }Partitioner有一个伴生对象,在这个伴生对象里实现了一个defaultPartitioner的方法,这也是Spark默认的分区策略方法调用的地方。
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } }
可见对Seq(rdd) ++ others中任何一个RDD遍历,若是有自定义的partitioner,则返回该partitioner,否则defaultPartitioner会生成HashPartitioner的实例,即默认采用的是HashPartitioner。HashPartitioner的初始化partition数目,取决于是否设置了spark.default.parallelism,如果没有的话就取RDD中partition数目最大的值。
需要注意的是HashPartitioner不支持类似RDD[Array[_]] or RDD[(Array[_], _)]的RDD。
HashPartitioner实现了抽象类Partitioner,对key进行hash处理,getPartition(key)方法返回的是partitionID,范围是0~numPartitions-1
def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) }
RangePartitioner处理的KV RDD要求Key是可排序的,即满足Scala的Ordered[K]类型。所以它的构造如下:
class RangePartitioner[K : Ordering : ClassTag, V]( @transient partitions: Int, @transient rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner
计算一个rangBounds(上界),让分区数目为:numPartitions= rangeBounds.length + 1,rangBounds的计算如下:
// An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key <- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } RangePartitioner.determineBounds(candidates, partitions) } } }
def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } }
相关文章推荐
- shell里面的test用法
- HDU 4568【最短路+状压DP】
- 各种杂项组件(3)之--CalendarView(日历视图)、DatePicker/TimePicker(日期、时间选择器)、NumberPicker(数值选择器)
- PhotoView源码分析(1)
- 原型继承
- c++ primer读书笔记之c++11(三)
- 一对多的单项关联
- linux之用户管理
- 4.Realm(数据处理,交互)
- Unity3D学习笔记《Space Shooter》二
- C语言实验题——三个数排序
- 将两个数组合并并排序
- 第31-35课
- BZOJ 1026: [SCOI2009]windy数( dp )
- 已知: 每个飞机只有一个油箱, 飞机之间可以相互加油(注意是相互,没有加油机) 一箱油可供一架飞机绕地球飞半圈,问题:为使至少一架飞机绕地球一圈回到起飞时的飞机
- 检测出运动目标后提取边界 两个函数 cvFindContours和cvBoundingRect
- Windows内核编程基础篇之获得当前滴答数
- VS2010中“工具>选项中的VC++目录编辑功能已被否决”解决方法
- rdb快照持久化
- hdu 1864 最大报销额(非整数背包)