您的位置:首页 > 其它

spark2.2.0中stage划分的源码解析

2019-01-18 21:30 363 查看

这里主要讲解的是stage的划分。stage的划分工作是在DAGScheduler中完成的,在DAGScheduler中会将一个job根据宽窄依赖划分为多个stage。下面会详细介绍stage的划分过程。

1.首先你需要有一个spark2.X源码,因为你可以在源码的基础上进行注释,方便你后期的学习。

双击shift->输入RDD

2.进入到RDD的源码,你会发现我们调用的spark算子都在这里,算子主要分为两大类:action和transformation算子。只有当spark作业遇到action算子的时候才会想spark集群提交代码,如果程序中没有action算子,那么代码是不会提交到集群中执行的。所以这里任意挑选一个action算子,这里我挑选的是take算子,这个算子的意思取执行结束结果的前n行。

 

3.因为执行action算子的话,会提交任务到集群中,所以每个action算子底层都会调用一个runJob方法,来提交任务。

4.按住ctrl+鼠标左键,进入这个runJob方法,你会发现它底层调用的气势是SparkContext(spark上下)的runJob方法,这个方法需要传入三个参数

第一个参数:rdd

第二个参数:processPartition(在RDD的每个分区上运行的函数),例如调用tabke(2)这个2就相当于一个函数了。

5.点击runJob进入到DAGScheduler中的runJob方法,这里也会传入很多参数,这里就不一一介绍了。当进入到DAGScheduler中,这里会调用那个submitJob方法,去提交一个Job任务,然后会返回一个阻塞线程等待job的完成。

6.点击submitJob进入到这个方法中。在submitJob方法中,首先会检查传入的rdd分区是否存在,然后会为当前的这个job任务创建一个jobID,因为一个spark集群中可能会有多个job任务同时运行。如果发现partitions的长度为0了,也就是说不存在任务了,这里会就返回一个阻塞线程,使得runjob中的阻塞线程得以释放掉。

7.当分区的长度大于0的话,也就是说这个job中还存在任务可以执行,首先会创建一个阻塞的线程,eventProcessLoop是一个DAGScheduler的事件队列,因为一个spark集群中是可以存在多个job任务同时运行的,所以这里采用了FIFO先进先出的资源调度队列(还有一个任务调度队列是FAIR公平调度队列,它不是先进先出,而是根据集群的资源利用和job本身的情况进行一个友好的调度,这里可以自行百度查找),

[code]8.点击eventProcessLoop

9.点击DAGSchedulerEventProcessLoop进入到这个类中,scala中有一个强大的模式匹配功能。当进入到这个类的时候会调用doOnReceive方法,使用模式匹配,确定调用那个类,方法,任务的提交的话是进入箭头所指的这个方法中

 

9.1.在介绍handleJobSubmitted之前,我想介绍一个什么是ResultStage和ShuffleMapStage,因为刚开始的时候我也是有点懵的,不懂这两个到底是个什么东东。

Stage分为两种,

shuffle map stage, in which case its tasks' results are input for another stage
其实就是,非最终stage, 后面还有其他的stage, 所以它的输出一定是需要shuffle并作为后续的输入
result stage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)
最终的stage, 没有输出, 而是直接产生结果或存储

下午是一个job任务在进行stage划分,一个可以看到一个划分成了3个stage,从图中很容易可以看出有一个ResultStage和两个shuffleMapStage

10.当进入到handleJobSubmitted中,可以看到这里会创建一个finalStage这样的一个ResultStage,可能你会很好奇的问为什么起了这个名字,如果了解过stage划分原理的小伙伴应该都知道,气势stage的划分是从后往前推的,也就是说这里会先找到最后一个stage,然后通过依赖关系继续往前寻找stage(stage划分的一句是shuffle,如果就shuffle的话就会产生stage),这里stage划分为shuffleMapStage和ResultStage两种,这里每个job是由1个ResultStage和0+个ShuffleMapStage组成。

11.点击createResultStage,进入看看这里的resultStage是如何创建的,这个方法主要是用来创建ResultStage。getOrCreateParentStages这个方法一会进去看看,他主要做的是通过shuffle之间的依赖关系去寻找是否有shuffle操作,如果有的话就会创建shuffleMapStage,如果遍历一遍都没有发现,就会返回一个list空的集合。这里的parents是一个list集合,里面存放都是ShuffleMapStage。

 

12.点击getorCreateParentStages这个方法,来一探究竟,父stage是怎么生成的。这里做的工作主要是抽取当前rdd的Shuffle的依赖,这里调用那个getOrcreateShuffleMapStage方法来创建shuffleMapStage,这里将传入两个参数,shuffleDep:是shuffle的依赖关系,firstJobId:job的id。

13.点击getOrCreateShuffleMapStage,这里会通过传递过来的shuffleDep,提取到shuffleId从而获取shuffleMapStage,如果能获取到shuffleMapStage的话,就直接返回stage,如果没有的就会循环遍历所有的shuffleDep,构建shuffleMapStage,具体请看createShuffleMapStage方法。

 

14.点击createShuffleMapStage,这里有有一段代码val numTasks = rdd.partitions.lenth,可以看出这里task的数量和partition(分区的数量是相同的)。

到这里,stage的划分工作就全部都完毕。

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: