您的位置:首页 > 其它

Spark划分Shuffle依赖以及创建Stage的流程

2017-04-08 22:33 225 查看
博客为笔者学习过程中,自我的理解和总结,难免存在错误,如果给您造成困扰请原谅,同时希望指点迷津

上一篇博文介绍了Spark提交作业的流程以及作业是如何被触发在集群中运行的,答案便是:action算子中调用sparkContext.runJob方法触发执行的,当我们执行rdd的action算子时候,这时候就会调用sparkContext的runJob方法,在runJob方法中完成调用dagScheduler的runJob方法触发程序在集群中真正运行!更多细节参见:http://blog.csdn.net/dax1n/article/details/69357716博文。

在介绍划分Shuffle依赖和根据Shuffle依赖创建Stage之前说明几个概念:

算子类型:action和transformation两种,前面可以触发作业执行,后者是结果的一种表示形式

Job:此Job不是指我们开发的jar程序在集群的一次运行,而是程序中的action触发在集群上一次运行的任务为Job,一个jar程序运行可以有多个Job。Jobs是提交给Scheduler顶层的单元。

Stage:一个Job根据Shuffle可以划分多一个或者多个Stage(主要是看该Job中有几个Shuffle依赖决定有几个Stage),同时Stage是Task的集合。

Task: Task是Work的独有的单元,每一个Task都会发送到集群中Worker节点上运行。

1:我们接下来便以dagScheduler的runJob作为Entry Point进行分析:



在runJob中调用submitJob方法进行提交作业,submitJob返回值作用:a JobWaiter object that can be used to block until the job finishes executing or can be used to cancel the job. 

2:接下来org.apache.spark.scheduler.DAGScheduler#submitJob:





3:接下来调用org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted:



4:接下来org.apache.spark.scheduler.DAGScheduler#createResultStage:



5:再接下来org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages:


这里调用的org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies方法值根据当前RDD寻找其前面的第一个Shuffle依赖(如果有多个的话,只返回前面的第一个),例如图中有两个Shuffle依赖,但是只返回2:Shuffle依赖。



拓展:org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies返回的是List[ShuffleDependency],前面说返回的是最近的Shuffle依赖,那为什么还返回一个List呢?

                                          


当我们对如上图形调用getShuffleDependencies时候返回的就是多个Shuffle依赖,当我们传入RDD3时候,由于RDD3的父依赖是RangeDependency依赖,不是Shuffle依赖,所以会将中间两个RDD存储到 waitingForVisit栈中,对于中间两个RDD再次遍历便找到2个Shuffle依赖。此处不要误解!

6:由于getShuffleDependencies只返回一个前面最近的Shuffle依赖,此时接着执行步骤5中的getOrCreateShuffleMapStage方法,对返回的Shuffle依赖创建Stage:



 getOrCreateShuffleMapStage方法首先根据当前shuffleDep的shuffle依赖id判断是否创建了Stage,如果创建了返回该Stage,如果没有创建的话调用org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies方法寻找shuffleDep的父RDD前面所有的Shuffle依赖(依赖划分是后向前进行的),最后以栈的形式返回,通过foreach对栈(栈顶是最前面的shuffle依赖)中所有的Shuffle依赖创建Stage(stage创建过程是由前向后创建的)。最后返回一个Stage的列表!

7:接下来我们看一下org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies根据当前rdd寻找前面所有shuffle依赖的过程:



通过调用org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies方法,没找到一个Shuffle依赖就把该依赖的父RDD放到waitingForVisit栈中,一次遍历

 waitingForVisit找到所有的Shuffle依赖。

8:接下来看看org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies实现:



这个方法看注释就可以了,不在解释!

总结:第一次是根据RDD调用getShuffleDependencies找到第一个shuffle,然后根据该shuffle创建Stage,如果stage存在则直接返回,不存在的话,寻找该shuffle父RDD的所有依赖Shuffle依赖,最后以Stack[ShuffleDependency]数据结构返回,然后遍历该栈依次为ShuffleDependency创建Stage,最后返回List[Stage]。大概流程就是这样!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: