您的位置:首页 > 其它

Spark Scheduler模块源码分析之DAGScheduler

2016-07-13 23:47 351 查看
  本文主要结合Spark-1.6.0的源码,对Spark中任务调度模块的执行过程进行分析。Spark Application在遇到Action操作时才会真正的提交任务并进行计算。这时Spark会根据Action操作之前一系列Transform操作的关联关系,生成一个DAG,在后续的操作中,对DAG进行Stage划分,生成Task并最终运行。整个过程如下图所示,DAGScheduler用于对Application进行分析,然后根据各RDD之间的依赖关系划分Stage,根据这些划分好的Stage,对应每个Stage会生成一组Task,将Task Set提交到TaskScheduler后,会由TaskScheduler启动Executor进行任务的计算。

  


  在任务调度模块中最重要的三个类是:

1. org.apache.spark.scheduler.DAGScheduler

2. org.apache.spark.scheduler.SchedulerBackend

3. org.apache.spark.scheduler.TaskScheduler

这里面SchedulerBackend主要起到的作用是为Task分配计算资源。

  接下来对这分成三篇博客对这三个主要的类进行分析,本文分析DAGScheduler的执行过程。

一、DAGScheduler的构建

  Spark在构造SparkContext时就会生成DAGScheduler的实例。

val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched//生成schedulerBackend
_taskScheduler = ts//生成taskScheduler
_dagScheduler = new DAGScheduler(this)//生成dagScheduler,传入当前sparkContext对象。


  在生成_dagScheduler之前,已经生成了_schedulerBackend和_taskScheduler对象。这两个对象会在接下来第二和第三部分中介绍。之所以taskScheduler对象在dagScheduler对象构造之前先生成,是由于在生成DAGScheduler的构造方法中会从传入的SparkContext中获取到taskScheduler对象
def this(sc: SparkContext) = this(sc, sc.taskScheduler)


  看一下DAGScheduler对象的主构造方法,

class DAGScheduler(
private[scheduler] val sc: SparkContext, // 获得当前SparkContext对象
private[scheduler] val taskScheduler: TaskScheduler,  // 获得当前saprkContext内置的taskScheduler
listenerBus: LiveListenerBus,     // 异步处理事件的对象,从sc中获取
mapOutputTracker: MapOutputTrackerMaster, //运行在Driver端管理shuffle map task的输出,从sc中获取
blockManagerMaster: BlockManagerMaster, //运行在driver端,管理整个Job的Block信息,从sc中获取
env: SparkEnv, // 从sc中获取
clock: Clock = new SystemClock())


  其中有关LiveListenerBus会在Spark-1.6.0之Application运行信息记录器JobProgressListener中有具体介绍。MapOutputTrackerMaster,BlockManagerMaster后续也会写博客进行分析。

DAGScheduler的数据结构

  在DAGScheduler的源代码中,定义了很多变量,在刚构造出来时,仅仅只是初始化这些变量,具体使用是在后面Job提交的过程中了。

private[scheduler] val nextJobId = new AtomicInteger(0) // 生成JobId
private[scheduler] def numTotalJobs: Int = nextJobId.get() // 总的Job数
private val nextStageId = new AtomicInteger(0) // 下一个StageId

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] // 记录某个job对应的包含的所有stage
private[scheduler] val stageIdToStage = new HashMap[Int, Stage] // 记录StageId对应的Stage
private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage] // 记录每一个shuffle对应的ShuffleMapStage,key为shuffleId
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] //

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]

// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]

// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]

private[scheduler] val activeJobs = new HashSet[ActiveJob]

private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)


  DAGScheduler构造完成,并初始化一个eventProcessLoop实例后,会调用其
eventProcessLoop.start()
方法,启动一个多线程,然后把各种event都提交到eventProcessLoop中。这个eventProcessLoop比较重要,在后面也会提到。

二、Job的提交

  一个Job实际上是从RDD调用一个Action操作开始的,该Action操作最终会进入到
org.apache.spark.SparkContext.runJob()
方法中,在SparkContext中有多个重载的runJob方法,最终入口是下面这个:

// SparkContext.runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}


  这里调用
dagScheduler.runJob()
方法后,正式进入之前构造的DAGScheduler对象中。在这个方法中,后续一系列的过程以此为:

1. DAGScheduler#runJob

  执行过程中各变量的内容如下图所示

  


  调用DAGScheduler.submitJob方法后会得到一个JobWaiter实例来监听Job的执行情况。针对Job的Succeeded状态和Failed状态,在接下来代码中都有不同的处理方式。

  

2. DAGScheduler#submitJob

  进入submitJob方法,首先会去检查rdd的分区信息,在确保rdd分区信息正确的情况下,给当前job生成一个jobId,nexJobId在刚构造出来时是从0开始编号的,在同一个SparkContext中,jobId会逐渐顺延。然后构造出一个JobWaiter对象返回给上一级调用函数。通过上面提到的eventProcessLoop提交该任务,最终会调用到
DAGScheduler.handleJobSubmitted
来处理这次提交的Job。handleJobSubmitted在下面的Stage划分部分会有提到。

  


  

3. DAGSchedulerEventProcessLoop#post

  在前面的方法中,调用post方法传入的是一个JobSubmitted实例。DAGSchedulerEventProcessLoop类继承自EventLoop类,其中的post方法也是在EventLoop中定义的。在EventLoop中维持了一个LinkedBlockingDeque类型的事件队列,将该Job提交事件存入该队列后,事件线程会从队列中取出事件并进行处理。

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() // 事件队列
def post(event: E): Unit = {
eventQueue.put(event) // 将JobSubmitted,Job提交事件存入该队列中
}


4、EventLoop#run

  该方法从eventQueue队列中顺序取出event,调用onReceive方法处理事件

val event = eventQueue.take()
try {
onReceive(event)
}


5、DAGSchedulerEventProcessLoop#onReceive

  在onReceive方法中,进一步调用doOnReceive方法

override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}


6、DAGSchedulerEventProcessLoop#doOnReceive

  在该方法中,根据事件类别分别匹配不同的方法进一步处理。本次传入的是JobSubmitted方法,那么进一步调用的方法是DAGScheduler.handleJobSubmitted。这部分的逻辑,以及还可以处理的其他事件,都在下面的源代码中。

  

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 处理Job提交事件
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
// 处理Map Stage提交事件
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
// 处理Stage取消事件
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
// 处理Job取消事件
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
// 处理Job组取消事件
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
// 处理所以Job取消事件
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
// 处理Executor分配事件
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
// 处理Executor丢失事件
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
// 处理完成事件
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
// 处理task集失败事件
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
// 处理重新提交失败Stage事件
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}


7、DAGScheduler#handleJobSubmitted

  当Job提交后,JobSubmitted事件会被eventProcessLoop捕获到,然后进入本方法中。开始处理Job,并执行Stage的划分。这一部分会衔接下一节,所以这个方法的源码以及Stage如何划分会在下一节中详细描述。

  

三、Stage的划分

  Stage的划分过程中,会涉及到宽依赖和窄依赖的概念,宽依赖是Stage的分界线,连续的窄依赖都属于同一Stage。

  


  比如上图中,在RDD G处调用了Action操作,在划分Stage时,会从G开始逆向分析,G依赖于B和F,其中对B是窄依赖,对F是宽依赖,所以F和G不能算在同一个Stage中,即在F和G之间会有一个Stage分界线。上图中还有一处宽依赖在A和B之间,所以这里还会分出一个Stage。最终形成了3个Stage,由于Stage1和Stage2是相互独立的,所以可以并发执行,等Stage1和Stage2准备就绪后,Stage3才能开始执行。

1、DAGScheduler#handleJobSubmitted

  这个方法的具体代码如下所示,前面提到了Stage的划分是从最后一个Stage开始逆推的,每遇到一个宽依赖处,就分裂成另外一个Stage,依此类推直到Stage划分完毕为止。并且,只有最后一个Stage的类型是ResultStage类型。

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)// Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
//为该Job生成一个ActiveJob对象,并准备计算这个finalStage
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job // 该job进入active状态
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)   //提交当前Stage

submitWaitingStages()
}


2、DAGScheduler#newResultStage

  在这个方法中,会根据最后调用Action的那个RDD,以及方法调用过程callSite,生成的jobId,partitions等信息生成最后那个Stage。

private def newResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)// 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)// 创建当前最后的ResultStage
stageIdToStage(id) = stage // 将ResultStage与stageId相关联
updateJobIdStageIdMaps(jobId, stage) // 更新该job中包含的stage
stage
}


3、DAGScheduler#getParentStagesAndId

四、任务的生成

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: