您的位置:首页 > 其它

Spark RDD系列-------1. 决定Spark RDD分区算法因素的总结

2015-11-25 17:51 435 查看
    RDD在调用引起Shuffle的方法的时候,如果没有显示指定ShuffledRDD的分区,那么会调用Partitioner.defaultPartitioner方法来确定ShuffledRDD的分区,比如RDD.combineByKey:

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}


RDD.groupByKey:

def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}


RDD.join:

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}


Partitioner.defaultPartitioner代码如下:

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse//从大到小排序
for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get//如果依赖的RDD中存在RDD已经设置了分区,则从设置了分区的RDD中则挑选出分区数最大的RDD.partitioner
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)//默认取分区个数最大的RDD作为hashcode
}
}
可见默认情况下,ShuffledRDD的分区确定算法为:
1. 如果依赖的RDD中存在RDD已经设置了RDD.partitioner,则从设置了分区的RDD中则挑选出分区数最大的RDD.partitioner

2. 如果依赖的所有RDD都没有设置RDD.partitioner,但是设置了spark.default.parallelism,那么根据spark.default.parallelism设置创建HashPartitioner,作为ShuffledRDD的分区依据

3. 以上2点都不满足,则从依赖的RDD中,去除分区数最大的RDD的分区个数,创建HashPartitioner,作为ShuffledRDD的分区依据

一个Spark应用可能包含多个Stage,假设Stage1经过Shuffle后生成新的ShuffledRDD1,然后再开始Stage2,但是从Stage2到Stage3的时候,在确定ShuffledRDD2的时候,ShuffledRDD1.partitioner是不是还存在呢?RDD.map是RDD最常调用的方法,RDD.map会创建MapPartitionsRDD,下面看看MapPartitionsRDD的创建:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}


可见MapPartitionsRDD.partitioner为None,由此可见ShuffledRDD1执行完map方法转换之后,ShuffledRDD1的partitioner没有传递到MapPartitionsRDD

为什么呢?

因为进行map操作之后,RDD中的数据已经发生了变化,每个数据如果shuffle的话计算出来的Hash值已经发生了改变,而Hash值确定了RDD中一个元素所在的分区,RDD的每个元素在下一个Stage所在的分区很可能发生了变化,所以默认情况下MapPartitionsRDD.partitioner设置为None,这样即使shuffle前的Stage1和Shuffle后的Stage2的分区个数相同,也需要Shuffle来重新确定RDD中每个元素所在的分区。

假设Shuffle后的RDD依赖多个RDD,比如说CoGroupedRDD,会不会两个RDD都会发生Shuffle呢?下面以CoGroupedRDD为例看一看。CoGroupedRDD.getDependencies的定义如下:override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
可见如果CoGroupedRDD依赖的某个RDD和CoGroupedRDD分区相同,则这个被依赖的RDD就不会进行Shuffle。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: