spark 笔记 13: 再看DAGScheduler,stage状态更新流程
2015-01-25 22:29
225 查看
当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程。
=======================DAG task完成后的更新流程===================->CoarseGrainedSchedulerBackend::receiveWithLogging --调度器的事件接收器 ->case StatusUpdate(executorId, taskId, state, data) --状态更新事件(来源于CoarseGrainedExecutorBackend) ->scheduler.statusUpdate(taskId, state, data.value) --状态更新 ->taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) --将成功的时间封装到一个executor排队执行 ->getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions { ->val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { --反序列化结果 ->scheduler.handleSuccessfulTask(taskSetManager, tid, result) --处理成功的task ->taskSetManager.handleSuccessfulTask(tid, taskResult) -> sched.dagScheduler.taskEnded(tasks(index) ... result.metrics) --另起一段 ->maybeFinishTaskSet() --判断是否taskSet结束了,更新状态。注意:在DAG里,调度的粒度是taskSet。 ->sched.taskSetFinished(this) --如果taskSet结束了,更新DAG的这个调度单元 ->activeTaskSets -= manager.taskSet.id --从active taskSet中删除tid ->manager.parent.removeSchedulable(manager) ->schedulableQueue.remove(schedulable) --从调度队列中删除tid ->schedulableNameToSchedulable.remove(schedulable.name) --删除调度单元。 ->makeOffers(executorId) --将这个executorId分配给其他task使用 ->DAGScheduler::taskEnded --任务结束事件处理流程 ->eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics) ->def receive ->case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) ->dagScheduler.handleTaskCompletion(completion) --Responds to a task finishing. //This is called inside the event loop so it assumes that it can modify the scheduler's internal state ->event.reason match => case Success => --task结果是成功的 ->if (event.accumUpdates != null) --如果是状态更新 ->event.accumUpdates.foreach { case (id, partialValue) --更新状态 ->listenerBus.post(SparkListenerTaskEnd(...)) --通知listener任务结束 ->stage.pendingTasks -= task ->task match { ->case rt: ResultTask[_, _] => --如果是ResultTask ->if (job.numFinished == job.numPartitions) --如果所有的分片数据都完成 ->markStageAsFinished(stage) --那么这个Stage就是结束了 ->runningStages -= stage --从running状态中删除 ->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知Stage结束 ->cleanupStateForJobAndIndependentStages(job) --清除依赖关系 ->val registeredStages = jobIdToStageIds.get(job.jobId) --找到这个job对应的所有Stage(job对应多个stage) ->stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach //查找所有stage,找出注册了依赖于这个job所在stage的。 ->case (stageId, stage) => ->val jobSet = stage.jobIds ->if (!jobSet.contains(job.jobId)) --这些存在依赖的stage中,应该包含这个job的注册 ->logError("Job %d not registered for stage %d even though that stage was registered for the job" .format(job.jobId, stageId)) ->if (jobSet.isEmpty) // no other job needs this stage 没有其他job了,这个依赖的stage也结束了。 -> removeStage(stageId) --删除stage ->listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) --通知job结束 ->job.listener.taskSucceeded(rt.outputId, event.result) --通知task成功 ->case smt: ShuffleMapTask => --如果是shuffleMapTask ->if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) --如果stage的所有task都完成 ->markStageAsFinished(stage) --标志stage完成 ->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知stage完成 ->logInfo("looking for newly runnable stages") --stage完成了,意味着依赖这个stage的stage可以执行了 ->mapOutputTracker.registerMapOutputs --(?用处不明) ->clearCacheLocs() ->if (stage.outputLocs.exists(_ == Nil)) // Some tasks had failed; let's resubmit this stage ->submitStage(stage) ->else ->val newlyRunnable = new ArrayBuffer[Stage] -> for (stage <- waitingStages if getMissingParentStages(stage) == Nil) 如果一个stage没有依赖其他stage ->newlyRunnable += stage --这个没有依赖的stage就可以执行了 ->waitingStages --= newlyRunnable ->runningStages ++= newlyRunnable ->for {stage <- newlyRunnable.sortBy(_.id); jobId <- activeJobForStage(stage)} ->submitMissingTasks(stage, jobId) --将这些没有依赖的stage的所有active job提交执行 ->submitWaitingStages() --//Check for waiting or failed stages which are now eligible for resubmission. //Ordinarily run on every iteration of the event loop. 每个事件处理都会触发去检查waiting状态的stage是否能够执行了。 ->logTrace("Checking for newly runnable parent stages") ->waitingStages.clear() ->for (stage <- waitingStagesCopy.sortBy(_.jobId)) ->submitStage[/b](stage)
========================end================================
来自为知笔记(Wiz)
=======================DAG task完成后的更新流程===================->CoarseGrainedSchedulerBackend::receiveWithLogging --调度器的事件接收器 ->case StatusUpdate(executorId, taskId, state, data) --状态更新事件(来源于CoarseGrainedExecutorBackend) ->scheduler.statusUpdate(taskId, state, data.value) --状态更新 ->taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) --将成功的时间封装到一个executor排队执行 ->getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions { ->val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { --反序列化结果 ->scheduler.handleSuccessfulTask(taskSetManager, tid, result) --处理成功的task ->taskSetManager.handleSuccessfulTask(tid, taskResult) -> sched.dagScheduler.taskEnded(tasks(index) ... result.metrics) --另起一段 ->maybeFinishTaskSet() --判断是否taskSet结束了,更新状态。注意:在DAG里,调度的粒度是taskSet。 ->sched.taskSetFinished(this) --如果taskSet结束了,更新DAG的这个调度单元 ->activeTaskSets -= manager.taskSet.id --从active taskSet中删除tid ->manager.parent.removeSchedulable(manager) ->schedulableQueue.remove(schedulable) --从调度队列中删除tid ->schedulableNameToSchedulable.remove(schedulable.name) --删除调度单元。 ->makeOffers(executorId) --将这个executorId分配给其他task使用 ->DAGScheduler::taskEnded --任务结束事件处理流程 ->eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics) ->def receive ->case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) ->dagScheduler.handleTaskCompletion(completion) --Responds to a task finishing. //This is called inside the event loop so it assumes that it can modify the scheduler's internal state ->event.reason match => case Success => --task结果是成功的 ->if (event.accumUpdates != null) --如果是状态更新 ->event.accumUpdates.foreach { case (id, partialValue) --更新状态 ->listenerBus.post(SparkListenerTaskEnd(...)) --通知listener任务结束 ->stage.pendingTasks -= task ->task match { ->case rt: ResultTask[_, _] => --如果是ResultTask ->if (job.numFinished == job.numPartitions) --如果所有的分片数据都完成 ->markStageAsFinished(stage) --那么这个Stage就是结束了 ->runningStages -= stage --从running状态中删除 ->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知Stage结束 ->cleanupStateForJobAndIndependentStages(job) --清除依赖关系 ->val registeredStages = jobIdToStageIds.get(job.jobId) --找到这个job对应的所有Stage(job对应多个stage) ->stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach //查找所有stage,找出注册了依赖于这个job所在stage的。 ->case (stageId, stage) => ->val jobSet = stage.jobIds ->if (!jobSet.contains(job.jobId)) --这些存在依赖的stage中,应该包含这个job的注册 ->logError("Job %d not registered for stage %d even though that stage was registered for the job" .format(job.jobId, stageId)) ->if (jobSet.isEmpty) // no other job needs this stage 没有其他job了,这个依赖的stage也结束了。 -> removeStage(stageId) --删除stage ->listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) --通知job结束 ->job.listener.taskSucceeded(rt.outputId, event.result) --通知task成功 ->case smt: ShuffleMapTask => --如果是shuffleMapTask ->if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) --如果stage的所有task都完成 ->markStageAsFinished(stage) --标志stage完成 ->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知stage完成 ->logInfo("looking for newly runnable stages") --stage完成了,意味着依赖这个stage的stage可以执行了 ->mapOutputTracker.registerMapOutputs --(?用处不明) ->clearCacheLocs() ->if (stage.outputLocs.exists(_ == Nil)) // Some tasks had failed; let's resubmit this stage ->submitStage(stage) ->else ->val newlyRunnable = new ArrayBuffer[Stage] -> for (stage <- waitingStages if getMissingParentStages(stage) == Nil) 如果一个stage没有依赖其他stage ->newlyRunnable += stage --这个没有依赖的stage就可以执行了 ->waitingStages --= newlyRunnable ->runningStages ++= newlyRunnable ->for {stage <- newlyRunnable.sortBy(_.id); jobId <- activeJobForStage(stage)} ->submitMissingTasks(stage, jobId) --将这些没有依赖的stage的所有active job提交执行 ->submitWaitingStages() --//Check for waiting or failed stages which are now eligible for resubmission. //Ordinarily run on every iteration of the event loop. 每个事件处理都会触发去检查waiting状态的stage是否能够执行了。 ->logTrace("Checking for newly runnable parent stages") ->waitingStages.clear() ->for (stage <- waitingStagesCopy.sortBy(_.jobId)) ->submitStage[/b](stage)
========================end================================
来自为知笔记(Wiz)
相关文章推荐
- Spark研究笔记13:Swing 组件类
- Android4.4 Telephony流程分析——通话状态更新
- Spark的job触发流程原理与stage划分算法分析
- Spark任务流程笔记
- (学习笔记)android 5.0 系统去电流程状态判断(上)
- [笔记]TrueCrypt7.0a代码编译流程 (已更新 TrueCrypt 7.2代码在Win8.1 64位下编译流程)
- Android4.4 Systemui状态栏状态图标更新流程分析
- (学习笔记)android 5.0 系统去电流程状态判断(下)
- 我的内核学习笔记13:x86平台linux系统重启流程跟踪
- Spark源码阅读笔记:DAGScheduler
- spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁
- spark 笔记 8: Stage
- Android系统应用---SystemUI之二:Statusbar显示流程以及系统状态图标更新分析
- OpenGL笔记13 状态机
- 游戏开发流程与思想学习笔记(持续更新)
- spark 笔记 7: DAGScheduler
- Spark划分Shuffle依赖以及创建Stage的流程
- [Spark源码剖析] DAGScheduler划分stage
- healthd & batteryservice 电量相关状态更新流程
- sparkStreaming带状态更新(scala)