您的位置:首页 > 其它

spark 笔记 13: 再看DAGScheduler,stage状态更新流程

2015-01-25 22:29 225 查看
当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程。
=======================DAG task完成后的更新流程===================->CoarseGrainedSchedulerBackend::receiveWithLogging --调度器的事件接收器 ->case StatusUpdate(executorId, taskId, state, data) --状态更新事件(来源于CoarseGrainedExecutorBackend) ->scheduler.statusUpdate(taskId, state, data.value) --状态更新 ->taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) --将成功的时间封装到一个executor排队执行 ->getTaskResultExecutor.execute(new Runnable {override def run(): Unit = Utils.logUncaughtExceptions { ->val result = serializer.get().deserialize[TaskResult[_]](serializedData) match { --反序列化结果 ->scheduler.handleSuccessfulTask(taskSetManager, tid, result) --处理成功的task ->taskSetManager.handleSuccessfulTask(tid, taskResult) -> sched.dagScheduler.taskEnded(tasks(index) ... result.metrics) --另起一段 ->maybeFinishTaskSet() --判断是否taskSet结束了,更新状态。注意:在DAG里,调度的粒度是taskSet。 ->sched.taskSetFinished(this) --如果taskSet结束了,更新DAG的这个调度单元 ->activeTaskSets -= manager.taskSet.id --从active taskSet中删除tid ->manager.parent.removeSchedulable(manager) ->schedulableQueue.remove(schedulable) --从调度队列中删除tid ->schedulableNameToSchedulable.remove(schedulable.name) --删除调度单元。 ->makeOffers(executorId) --将这个executorId分配给其他task使用 ->DAGScheduler::taskEnded --任务结束事件处理流程 ->eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics) ->def receive ->case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) ->dagScheduler.handleTaskCompletion(completion) --Responds to a task finishing. //This is called inside the event loop so it assumes that it can modify the scheduler's internal state ->event.reason match => case Success => --task结果是成功的 ->if (event.accumUpdates != null) --如果是状态更新 ->event.accumUpdates.foreach { case (id, partialValue) --更新状态 ->listenerBus.post(SparkListenerTaskEnd(...)) --通知listener任务结束 ->stage.pendingTasks -= task ->task match { ->case rt: ResultTask[_, _] => --如果是ResultTask ->if (job.numFinished == job.numPartitions) --如果所有的分片数据都完成 ->markStageAsFinished(stage) --那么这个Stage就是结束了 ->runningStages -= stage --从running状态中删除 ->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知Stage结束 ->cleanupStateForJobAndIndependentStages(job) --清除依赖关系 ->val registeredStages = jobIdToStageIds.get(job.jobId) --找到这个job对应的所有Stage(job对应多个stage) ->stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach //查找所有stage,找出注册了依赖于这个job所在stage的。 ->case (stageId, stage) => ->val jobSet = stage.jobIds ->if (!jobSet.contains(job.jobId)) --这些存在依赖的stage中,应该包含这个job的注册 ->logError("Job %d not registered for stage %d even though that stage was registered for the job" .format(job.jobId, stageId)) ->if (jobSet.isEmpty) // no other job needs this stage 没有其他job了,这个依赖的stage也结束了。 -> removeStage(stageId) --删除stage ->listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) --通知job结束 ->job.listener.taskSucceeded(rt.outputId, event.result) --通知task成功 ->case smt: ShuffleMapTask => --如果是shuffleMapTask ->if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) --如果stage的所有task都完成 ->markStageAsFinished(stage) --标志stage完成 ->listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) --通知stage完成 ->logInfo("looking for newly runnable stages") --stage完成了,意味着依赖这个stage的stage可以执行了 ->mapOutputTracker.registerMapOutputs --(?用处不明) ->clearCacheLocs() ->if (stage.outputLocs.exists(_ == Nil)) // Some tasks had failed; let's resubmit this stage ->submitStage(stage) ->else ->val newlyRunnable = new ArrayBuffer[Stage] -> for (stage <- waitingStages if getMissingParentStages(stage) == Nil) 如果一个stage没有依赖其他stage ->newlyRunnable += stage --这个没有依赖的stage就可以执行了 ->waitingStages --= newlyRunnable ->runningStages ++= newlyRunnable ->for {stage <- newlyRunnable.sortBy(_.id); jobId <- activeJobForStage(stage)} ->submitMissingTasks(stage, jobId) --将这些没有依赖的stage的所有active job提交执行 ->submitWaitingStages() --//Check for waiting or failed stages which are now eligible for resubmission. //Ordinarily run on every iteration of the event loop. 每个事件处理都会触发去检查waiting状态的stage是否能够执行了。 ->logTrace("Checking for newly runnable parent stages") ->waitingStages.clear() ->for (stage <- waitingStagesCopy.sortBy(_.jobId)) ->submitStage[/b](stage)
========================end================================

来自为知笔记(Wiz)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: