Spark Scheduler模块源码分析之DAGScheduler
2016-07-13 23:47
351 查看
本文主要结合Spark-1.6.0的源码,对Spark中任务调度模块的执行过程进行分析。Spark Application在遇到Action操作时才会真正的提交任务并进行计算。这时Spark会根据Action操作之前一系列Transform操作的关联关系,生成一个DAG,在后续的操作中,对DAG进行Stage划分,生成Task并最终运行。整个过程如下图所示,DAGScheduler用于对Application进行分析,然后根据各RDD之间的依赖关系划分Stage,根据这些划分好的Stage,对应每个Stage会生成一组Task,将Task Set提交到TaskScheduler后,会由TaskScheduler启动Executor进行任务的计算。
![](http://img.blog.csdn.net/20160713213743444)
在任务调度模块中最重要的三个类是:
1. org.apache.spark.scheduler.DAGScheduler
2. org.apache.spark.scheduler.SchedulerBackend
3. org.apache.spark.scheduler.TaskScheduler
这里面SchedulerBackend主要起到的作用是为Task分配计算资源。
接下来对这分成三篇博客对这三个主要的类进行分析,本文分析DAGScheduler的执行过程。
在生成_dagScheduler之前,已经生成了_schedulerBackend和_taskScheduler对象。这两个对象会在接下来第二和第三部分中介绍。之所以taskScheduler对象在dagScheduler对象构造之前先生成,是由于在生成DAGScheduler的构造方法中会从传入的SparkContext中获取到taskScheduler对象
看一下DAGScheduler对象的主构造方法,
其中有关LiveListenerBus会在Spark-1.6.0之Application运行信息记录器JobProgressListener中有具体介绍。MapOutputTrackerMaster,BlockManagerMaster后续也会写博客进行分析。
DAGScheduler构造完成,并初始化一个eventProcessLoop实例后,会调用其
这里调用
![](http://img.blog.csdn.net/20160713223349567)
调用DAGScheduler.submitJob方法后会得到一个JobWaiter实例来监听Job的执行情况。针对Job的Succeeded状态和Failed状态,在接下来代码中都有不同的处理方式。
![](http://img.blog.csdn.net/20160713223740762)
![](http://img.blog.csdn.net/20160713233132444)
比如上图中,在RDD G处调用了Action操作,在划分Stage时,会从G开始逆向分析,G依赖于B和F,其中对B是窄依赖,对F是宽依赖,所以F和G不能算在同一个Stage中,即在F和G之间会有一个Stage分界线。上图中还有一处宽依赖在A和B之间,所以这里还会分出一个Stage。最终形成了3个Stage,由于Stage1和Stage2是相互独立的,所以可以并发执行,等Stage1和Stage2准备就绪后,Stage3才能开始执行。
在任务调度模块中最重要的三个类是:
1. org.apache.spark.scheduler.DAGScheduler
2. org.apache.spark.scheduler.SchedulerBackend
3. org.apache.spark.scheduler.TaskScheduler
这里面SchedulerBackend主要起到的作用是为Task分配计算资源。
接下来对这分成三篇博客对这三个主要的类进行分析,本文分析DAGScheduler的执行过程。
一、DAGScheduler的构建
Spark在构造SparkContext时就会生成DAGScheduler的实例。val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched//生成schedulerBackend _taskScheduler = ts//生成taskScheduler _dagScheduler = new DAGScheduler(this)//生成dagScheduler,传入当前sparkContext对象。
在生成_dagScheduler之前,已经生成了_schedulerBackend和_taskScheduler对象。这两个对象会在接下来第二和第三部分中介绍。之所以taskScheduler对象在dagScheduler对象构造之前先生成,是由于在生成DAGScheduler的构造方法中会从传入的SparkContext中获取到taskScheduler对象
def this(sc: SparkContext) = this(sc, sc.taskScheduler)。
看一下DAGScheduler对象的主构造方法,
class DAGScheduler( private[scheduler] val sc: SparkContext, // 获得当前SparkContext对象 private[scheduler] val taskScheduler: TaskScheduler, // 获得当前saprkContext内置的taskScheduler listenerBus: LiveListenerBus, // 异步处理事件的对象,从sc中获取 mapOutputTracker: MapOutputTrackerMaster, //运行在Driver端管理shuffle map task的输出,从sc中获取 blockManagerMaster: BlockManagerMaster, //运行在driver端,管理整个Job的Block信息,从sc中获取 env: SparkEnv, // 从sc中获取 clock: Clock = new SystemClock())
其中有关LiveListenerBus会在Spark-1.6.0之Application运行信息记录器JobProgressListener中有具体介绍。MapOutputTrackerMaster,BlockManagerMaster后续也会写博客进行分析。
DAGScheduler的数据结构
在DAGScheduler的源代码中,定义了很多变量,在刚构造出来时,仅仅只是初始化这些变量,具体使用是在后面Job提交的过程中了。private[scheduler] val nextJobId = new AtomicInteger(0) // 生成JobId private[scheduler] def numTotalJobs: Int = nextJobId.get() // 总的Job数 private val nextStageId = new AtomicInteger(0) // 下一个StageId private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] // 记录某个job对应的包含的所有stage private[scheduler] val stageIdToStage = new HashMap[Int, Stage] // 记录StageId对应的Stage private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] // 记录每一个shuffle对应的ShuffleMapStage,key为shuffleId private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob] private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGScheduler构造完成,并初始化一个eventProcessLoop实例后,会调用其
eventProcessLoop.start()方法,启动一个多线程,然后把各种event都提交到eventProcessLoop中。这个eventProcessLoop比较重要,在后面也会提到。
二、Job的提交
一个Job实际上是从RDD调用一个Action操作开始的,该Action操作最终会进入到org.apache.spark.SparkContext.runJob()方法中,在SparkContext中有多个重载的runJob方法,最终入口是下面这个:
// SparkContext.runJob def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
这里调用
dagScheduler.runJob()方法后,正式进入之前构造的DAGScheduler对象中。在这个方法中,后续一系列的过程以此为:
1. DAGScheduler#runJob
执行过程中各变量的内容如下图所示调用DAGScheduler.submitJob方法后会得到一个JobWaiter实例来监听Job的执行情况。针对Job的Succeeded状态和Failed状态,在接下来代码中都有不同的处理方式。
2. DAGScheduler#submitJob
进入submitJob方法,首先会去检查rdd的分区信息,在确保rdd分区信息正确的情况下,给当前job生成一个jobId,nexJobId在刚构造出来时是从0开始编号的,在同一个SparkContext中,jobId会逐渐顺延。然后构造出一个JobWaiter对象返回给上一级调用函数。通过上面提到的eventProcessLoop提交该任务,最终会调用到DAGScheduler.handleJobSubmitted来处理这次提交的Job。handleJobSubmitted在下面的Stage划分部分会有提到。
3. DAGSchedulerEventProcessLoop#post
在前面的方法中,调用post方法传入的是一个JobSubmitted实例。DAGSchedulerEventProcessLoop类继承自EventLoop类,其中的post方法也是在EventLoop中定义的。在EventLoop中维持了一个LinkedBlockingDeque类型的事件队列,将该Job提交事件存入该队列后,事件线程会从队列中取出事件并进行处理。private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() // 事件队列 def post(event: E): Unit = { eventQueue.put(event) // 将JobSubmitted,Job提交事件存入该队列中 }
4、EventLoop#run
该方法从eventQueue队列中顺序取出event,调用onReceive方法处理事件val event = eventQueue.take() try { onReceive(event) }
5、DAGSchedulerEventProcessLoop#onReceive
在onReceive方法中,进一步调用doOnReceive方法override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() } }
6、DAGSchedulerEventProcessLoop#doOnReceive
在该方法中,根据事件类别分别匹配不同的方法进一步处理。本次传入的是JobSubmitted方法,那么进一步调用的方法是DAGScheduler.handleJobSubmitted。这部分的逻辑,以及还可以处理的其他事件,都在下面的源代码中。private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { // 处理Job提交事件 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) // 处理Map Stage提交事件 case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) // 处理Stage取消事件 case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) // 处理Job取消事件 case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) // 处理Job组取消事件 case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) // 处理所以Job取消事件 case AllJobsCancelled => dagScheduler.doCancelAllJobs() // 处理Executor分配事件 case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) // 处理Executor丢失事件 case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) // 处理完成事件 case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) // 处理task集失败事件 case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) // 处理重新提交失败Stage事件 case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
7、DAGScheduler#handleJobSubmitted
当Job提交后,JobSubmitted事件会被eventProcessLoop捕获到,然后进入本方法中。开始处理Job,并执行Stage的划分。这一部分会衔接下一节,所以这个方法的源码以及Stage如何划分会在下一节中详细描述。三、Stage的划分
Stage的划分过程中,会涉及到宽依赖和窄依赖的概念,宽依赖是Stage的分界线,连续的窄依赖都属于同一Stage。比如上图中,在RDD G处调用了Action操作,在划分Stage时,会从G开始逆向分析,G依赖于B和F,其中对B是窄依赖,对F是宽依赖,所以F和G不能算在同一个Stage中,即在F和G之间会有一个Stage分界线。上图中还有一处宽依赖在A和B之间,所以这里还会分出一个Stage。最终形成了3个Stage,由于Stage1和Stage2是相互独立的,所以可以并发执行,等Stage1和Stage2准备就绪后,Stage3才能开始执行。
1、DAGScheduler#handleJobSubmitted
这个方法的具体代码如下所示,前面提到了Stage的划分是从最后一个Stage开始逆推的,每遇到一个宽依赖处,就分裂成另外一个Stage,依此类推直到Stage划分完毕为止。并且,只有最后一个Stage的类型是ResultStage类型。private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)// Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } //为该Job生成一个ActiveJob对象,并准备计算这个finalStage val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job // 该job进入active状态 activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) //提交当前Stage submitWaitingStages() }
2、DAGScheduler#newResultStage
在这个方法中,会根据最后调用Action的那个RDD,以及方法调用过程callSite,生成的jobId,partitions等信息生成最后那个Stage。private def newResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)// 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现 val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)// 创建当前最后的ResultStage stageIdToStage(id) = stage // 将ResultStage与stageId相关联 updateJobIdStageIdMaps(jobId, stage) // 更新该job中包含的stage stage }
3、DAGScheduler#getParentStagesAndId
四、任务的生成
相关文章推荐
- Java 集合 JDK1.7的LinkedList
- android.support.v7.internal.widget.ActionBarOverlayLayout Couldn't Be Initialized
- iOS 图片转NSData-b
- Clone Graph
- 并查集模板
- Python 字符串格式化
- 详解:数据库名、实例名、ORACLE_SID、数据库域名、全局数据库名、服务名及手工脚本创建oracle数据库
- ecshop和ecstouch的insert标签的用法
- 神经网络,逻辑回归,矩阵求导
- Shopxp-v10.85 CRSF攻击远程添加管理漏洞
- IOS 消息转发机制
- Minimum Window Substring
- R Programming: Part 3 - Code Correctly and Efficiently
- PHP Error Handling API方法归纳总结.md
- bfs_poj_3669_Meteor Shower
- stm32寄存器版学习笔记08 DMA
- linux中vim编译器的使用
- 交通的常识
- Subsets
- echo -n -e参数详解