Spark算子[02]:coalesce,repartition
2017-11-28 16:11
483 查看
概述
coalesce与repartition都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现;当spark程序中,存在过多的小任务的时候,可以通过 RDD.coalesce方法,收缩合并分区,减少分区的个数,减小任务调度成本,避免Shuffle导致,这比使用RDD.repartition效率提高不少。
源码解析
coalesce源码
/** * 返回一个新的RDD,它将分区个数减少到“numPartitions”个分区。 * 这是一个窄依赖操作; * 如果从1000个分区合并成100各分区,将不会有Shuffle操作,100个新分区中的每一个将占据当前分区的10个。 * 如果要求更多的分区个数,将保持为当前的分区个数。 * * 但是,当我们进行一个剧烈的合并,设置numPartitions = 1 * 这可能导致你的计算比你想要的节点少,当numPartitions = 1时,只会在一个节点上合并; * 为了避免这种情况的发生,可以设置shuffle = true,这样将会添加一个shuffle操作, * 意味着当前的上游partitions将并行执行,无论当前的分区是几个 * * @note * 当shuffle = true,实际上,你可以合并到更多的分区。 * 如果您有少量的分区,比如100个,可能有几个分区异常大,这个时候这种方法很有用。 * 调用 coalesce(1000,shuffle = true) 将会生成1000个分区,数据分布使用散列分区(hash partitioner)。 * 可选分区合并器必须是可序列化的。 */ def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") //1. 如果shuffle=true if (shuffle) { // 将元素均匀地分布在输出分区上,从一个随机分区开始。 val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. // The HashPartitioner will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // 包含一个shuffle步骤,这样我们的上游tasks仍然是分布式的 new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { //2. 如果shuffle=false new CoalescedRDD(this, numPartitions, partitionCoalescer) } }
repartition源码
/** * 可以在此RDD中增加或降低并行度。在内部,这使用一个shuffle来重新分配数据。 * 如果您正在减少这个RDD中的分区数量,考虑使用`coalesce`,这可以避免进行shuffle。 */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
实例
object Coalesce1 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("coalesce1").setMaster("local") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9),8) val num = rdd.partitions.length //num=8 val rdd1 = rdd.coalesce(10,false,None); val num1 = rdd1.partitions.length //num1=8 val rdd2 = rdd.coalesce(10,true,None); val num2 = rdd2.partitions.length //num2=10 val rdd3 = rdd.coalesce(2,false,None); val num3 = rdd3.partitions.length //num3=2 } }
总结
假设RDD有N个分区,需要重新划分成M个分区:N < M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。
N > M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false;
在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。
N > M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。
总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的分区数变多的。
Spark优化涉及点
(1) 使用filter之后进行coalesce操作通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。
相关文章推荐
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- spark算子(repartition和coalesce)
- Spark编程之基本的RDD算子coalesce, repartition, checkpoint
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子--coalesce和repartition
- spark coalesce和repartition的区别
- 【Spark Java API】Transformation(4)—coalesce、repartition
- spark coalesce和repartition的区别
- Spark中repartition和coalesce的用法
- 【Spark】Spark中repartition和coalesce的区别
- spark coalesce和repartition的区别
- spark coalesce和repartition的区别
- spark coalesce和repartition的区别
- spark coalesce和repartition的区别
- spark coalesce和repartition的区别
- Spark中repartition和coalesce的用法