Spark RDD系列-------1. 决定Spark RDD分区算法因素的总结
2015-11-25 17:51
435 查看
RDD在调用引起Shuffle的方法的时候,如果没有显示指定ShuffledRDD的分区,那么会调用Partitioner.defaultPartitioner方法来确定ShuffledRDD的分区,比如RDD.combineByKey:
RDD.groupByKey:
RDD.join:
Partitioner.defaultPartitioner代码如下:
1. 如果依赖的RDD中存在RDD已经设置了RDD.partitioner,则从设置了分区的RDD中则挑选出分区数最大的RDD.partitioner
2. 如果依赖的所有RDD都没有设置RDD.partitioner,但是设置了spark.default.parallelism,那么根据spark.default.parallelism设置创建HashPartitioner,作为ShuffledRDD的分区依据
3. 以上2点都不满足,则从依赖的RDD中,去除分区数最大的RDD的分区个数,创建HashPartitioner,作为ShuffledRDD的分区依据
一个Spark应用可能包含多个Stage,假设Stage1经过Shuffle后生成新的ShuffledRDD1,然后再开始Stage2,但是从Stage2到Stage3的时候,在确定ShuffledRDD2的时候,ShuffledRDD1.partitioner是不是还存在呢?RDD.map是RDD最常调用的方法,RDD.map会创建MapPartitionsRDD,下面看看MapPartitionsRDD的创建:
可见MapPartitionsRDD.partitioner为None,由此可见ShuffledRDD1执行完map方法转换之后,ShuffledRDD1的partitioner没有传递到MapPartitionsRDD
为什么呢?
因为进行map操作之后,RDD中的数据已经发生了变化,每个数据如果shuffle的话计算出来的Hash值已经发生了改变,而Hash值确定了RDD中一个元素所在的分区,RDD的每个元素在下一个Stage所在的分区很可能发生了变化,所以默认情况下MapPartitionsRDD.partitioner设置为None,这样即使shuffle前的Stage1和Shuffle后的Stage2的分区个数相同,也需要Shuffle来重新确定RDD中每个元素所在的分区。
假设Shuffle后的RDD依赖多个RDD,比如说CoGroupedRDD,会不会两个RDD都会发生Shuffle呢?下面以CoGroupedRDD为例看一看。CoGroupedRDD.getDependencies的定义如下:override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
可见如果CoGroupedRDD依赖的某个RDD和CoGroupedRDD分区相同,则这个被依赖的RDD就不会进行Shuffle。
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = self.withScope { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) }
RDD.groupByKey:
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) }
RDD.join:
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { join(other, defaultPartitioner(self, other)) }
Partitioner.defaultPartitioner代码如下:
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse//从大到小排序 for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get//如果依赖的RDD中存在RDD已经设置了分区,则从设置了分区的RDD中则挑选出分区数最大的RDD.partitioner } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size)//默认取分区个数最大的RDD作为hashcode } }可见默认情况下,ShuffledRDD的分区确定算法为:
1. 如果依赖的RDD中存在RDD已经设置了RDD.partitioner,则从设置了分区的RDD中则挑选出分区数最大的RDD.partitioner
2. 如果依赖的所有RDD都没有设置RDD.partitioner,但是设置了spark.default.parallelism,那么根据spark.default.parallelism设置创建HashPartitioner,作为ShuffledRDD的分区依据
3. 以上2点都不满足,则从依赖的RDD中,去除分区数最大的RDD的分区个数,创建HashPartitioner,作为ShuffledRDD的分区依据
一个Spark应用可能包含多个Stage,假设Stage1经过Shuffle后生成新的ShuffledRDD1,然后再开始Stage2,但是从Stage2到Stage3的时候,在确定ShuffledRDD2的时候,ShuffledRDD1.partitioner是不是还存在呢?RDD.map是RDD最常调用的方法,RDD.map会创建MapPartitionsRDD,下面看看MapPartitionsRDD的创建:
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
可见MapPartitionsRDD.partitioner为None,由此可见ShuffledRDD1执行完map方法转换之后,ShuffledRDD1的partitioner没有传递到MapPartitionsRDD
为什么呢?
因为进行map操作之后,RDD中的数据已经发生了变化,每个数据如果shuffle的话计算出来的Hash值已经发生了改变,而Hash值确定了RDD中一个元素所在的分区,RDD的每个元素在下一个Stage所在的分区很可能发生了变化,所以默认情况下MapPartitionsRDD.partitioner设置为None,这样即使shuffle前的Stage1和Shuffle后的Stage2的分区个数相同,也需要Shuffle来重新确定RDD中每个元素所在的分区。
假设Shuffle后的RDD依赖多个RDD,比如说CoGroupedRDD,会不会两个RDD都会发生Shuffle呢?下面以CoGroupedRDD为例看一看。CoGroupedRDD.getDependencies的定义如下:override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
可见如果CoGroupedRDD依赖的某个RDD和CoGroupedRDD分区相同,则这个被依赖的RDD就不会进行Shuffle。
相关文章推荐
- 运动员最佳匹配问题
- Leetcode 第125题 Valid Palindrome
- js中confirm实现执行操作前弹出确认框的方法
- Android中的权限机制
- HTML5+CSS3 表格设计(Table)
- 从ViewRootImpl类分析View绘制的流程
- C实战:项目构建Make,Automake,CMake
- WebX入门指南
- hdu 3342 Legal or Not(判断成环)
- 使用nginx搭建https服务器(转)
- Android开发人员一见钟情的Java编程技巧!
- 我的Android开发半年工作经验总结
- Android 贝塞尔曲线的魅力
- Xcode7 使用NSURLSession发送HTTP请求的问题
- android studio的ndk开发
- AngularJs碎片笔记
- Android 之 ServiceManager与服务管理
- 线程的等待与唤醒
- JVM调优总结 收集器选择-各个代比例
- 如何在自己的网站中设置qq客服