您的位置:首页 > 其它

Spark算子[02]:coalesce,repartition

2017-11-28 16:11 483 查看

概述

coalesce与repartition都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现;

当spark程序中,存在过多的小任务的时候,可以通过 RDD.coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本,避免Shuffle导致,这比使用RDD.repartition效率提高不少。

源码解析

coalesce源码

/**
* 返回一个新的RDD,它将分区个数减少到“numPartitions”个分区。
* 这是一个窄依赖操作;
* 如果从1000个分区合并成100各分区,将不会有Shuffle操作,100个新分区中的每一个将占据当前分区的10个。
* 如果要求更多的分区个数,将保持为当前的分区个数。
*
* 但是,当我们进行一个剧烈的合并,设置numPartitions = 1
* 这可能导致你的计算比你想要的节点少,当numPartitions = 1时,只会在一个节点上合并;
* 为了避免这种情况的发生,可以设置shuffle = true,这样将会添加一个shuffle操作,
* 意味着当前的上游partitions将并行执行,无论当前的分区是几个
*
* @note
* 当shuffle = true,实际上,你可以合并到更多的分区。
* 如果您有少量的分区,比如100个,可能有几个分区异常大,这个时候这种方法很有用。
* 调用 coalesce(1000,shuffle = true) 将会生成1000个分区,数据分布使用散列分区(hash partitioner)。
* 可选分区合并器必须是可序列化的。
*/

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
//1. 如果shuffle=true
if (shuffle) {
// 将元素均匀地分布在输出分区上,从一个随机分区开始。
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself.
// The HashPartitioner will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// 包含一个shuffle步骤,这样我们的上游tasks仍然是分布式的
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
//2. 如果shuffle=false
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}


repartition源码

/**
* 可以在此RDD中增加或降低并行度。在内部,这使用一个shuffle来重新分配数据。
* 如果您正在减少这个RDD中的分区数量,考虑使用`coalesce`,这可以避免进行shuffle。
*/

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}


实例

object Coalesce1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("coalesce1").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),8)
val num = rdd.partitions.length //num=8

val rdd1 = rdd.coalesce(10,false,None);
val num1 = rdd1.partitions.length //num1=8

val rdd2 = rdd.coalesce(10,true,None);
val num2 = rdd2.partitions.length //num2=10

val rdd3 = rdd.coalesce(2,false,None);
val num3 = rdd3.partitions.length //num3=2
}
}


总结

假设RDD有N个分区,需要重新划分成M个分区:

N < M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

N > M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false;

在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。

N > M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。

总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的分区数变多的。

Spark优化涉及点

(1) 使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: