spark源码之Job执行(1)stage划分与提交
2017-06-12 21:59
537 查看
1 从reduce看Job执行流程
1.1 reduce操作
以reduce操作为例,看看作业执行的流程def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }
reduce两个方法具体作用之前总结了。
最后reduce中调用sc.runJob(this, reducePartition, mergeResult),执行reducePartition和mergeResult两个方法。
那么runJob做了什么呢?
现在进入sparkContext域中的runJob:
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
runJob输入参数:
rdd,目标RDD;
func,在RDD每个分区上执行的函数;
partitions,有些job不会计算目标RDD上的所有partitions。
resultHandler,每个分区的结果写入这个callback函数
2.1 dagScheduler管理
之后,Job将由dagScheduler管理,在dagScheduler的runJob方法中,提交job,又会进入submitJob在此之前,先看看这个dagScheduler到底有些什么属性
2.1.1 dagScheduler与stage相关属性
首先是各个部分的ID的设置:private[scheduler] val nextJobId = new AtomicInteger(0)//jobId初始化为0 private[scheduler] def numTotalJobs: Int = nextJobId.get()//jobId跟job个数相等,有多少编号就多少 private val nextStageId = new AtomicInteger(0)//stageId private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]//一个job可以对应多个stage private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
然后是几个存储stage的hash表:
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]//有shuffle操作的map的stage划分结果 private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]//正在执行的job // 存储那些父辈还没有执行的stage private[scheduler] val waitingStages = new HashSet[Stage] // 正在执行的stage private[scheduler] val runningStages = new HashSet[Stage]
2.1.2 submitJob
dagScheduler的runJob实际调用submitJob方法val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
submitJob将向scheduler提交这个action:
val jobId = nextJobId.getAndIncrement()//新增Job,id +1s if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]//与task关联 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
2.1.3 DAGSchedulerEventProcessLoop
之后程序进入DAGSchedulerEventProcessLoop处理环节:先看eventProcessLoop.post:
def post(event: E): Unit = { eventQueue.put(event) }
eventQueue是一个event队列,现在就是将event入队
一旦入队,doOnReceive将会处理event。因为之前传入是JobSubmitted对象,那么根据匹配执行handleJobSubmitted
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) .......#此处省略一万字
所以handleJobSubmitted才是stages划分和提交的关键所在。
2.2 handleJobSubmitted:
首先是完整的handleJobSubmit:private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { "//获取最后一个stages finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)//表示一个在DAG中运行的Job clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage)//提交finalStage }
在这个方法中,将得到finalstage:
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //
进入createResultStage
private def createResultStage( rdd: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val parents = getOrCreateParentStages(rdd, jobId)//获取父stages" val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
这里流程进入一个关键函数中:
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
首先是getOrCreateParentStages,以当前的rdd和jobid作为参数,返回一个List(parentStage,id),前面的代表这个当前的resultStage所依赖的全部stage,后面的就是返回当前stage的id
private[scheduler] def getShuffleDependencies( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]]/#父依赖hashset val visited = new HashSet[RDD[_]]#访问过的 val waitingForVisit = new Stack[RDD[_]]// waitingForVisit.push(rdd)//待访问的rdd首先压入栈 while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => waitingForVisit.push(dependency.rdd)//这个怎么办? } } } parents }
根据入栈的rdd的依赖关系,
如果是窄依赖,那么就将压入栈底;
如果是宽依赖,那么就将至添加到parents中。最后结果将返回parents。也就是parents中是宽依赖的rdd关系。
这样之后得到最终的stage结果。
private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { #如果原本shuffleIdToMapStage中就有ShuffleMapStage ,直接返回 case Some(stage) => stage #如果没有,调用getMissingAncestorShuffleDependencies找到祖先的宽依赖 case None => getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleIdToMapStage.contains(dep.shuffleId)) { createShuffleMapStage(dep, firstJobId) } } // Finally, create a stage for the given shuffle dependency. createShuffleMapStage(shuffleDep, firstJobId) } }
追寻stages的父辈,如果父辈stages为missing的状态,要么已经没有父辈了,要么父辈已经都提交了:
private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage]//尚未提交的父stages val visited = new HashSet[RDD[_]]//已经处理过的RDD // 未处理的将存入这个栈 val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { for (dep <- rdd.dependencies) { dep match {//如果dep是宽依赖就可以直接产生stage case shufDep: ShuffleDependency[_, _, _] => val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } //如果是窄依赖,将rdd存入栈中,这个栈中的rdd都是窄依赖 case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd)//压入第一个rdd,然后递归遍历整个stage中rdd,寻找其父依赖,直至最开始rdd while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } missing.toList//父依赖返回list }
最终返回ShuffleMapStage。
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd val numTasks = rdd.partitions.length//每个分区对应一个task val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep) stageIdToStage(id) = stage shuffleIdToMapStage(shuffleDep.shuffleId) = stage updateJobIdStageIdMaps(jobId, stage) if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)// 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象 val locs = MapOutputTracker.deserializeMapStatuses(serLocs)//反序列化 (0 until locs.length).foreach { i => if (locs(i) ne null) { // locs(i) will be null if missing stage.addOutputLoc(i, locs(i)) } } } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) } stage }
这样getOrCreateParentStages就得到了该rdd的父依赖stage的List。
ResultStage将返回最终的jobId绑定的stage结果,最终将赋值给finalStage。
之后就是根据划分的stage激活job:
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs()
然后提交该job的最后stage:
submitStage(finalStage)
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage)//根据stage获取JobID if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") //没有等待父stage,没有正在运行,且没有失败的情况下退出。 if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id)//获取stage还未提交的parent logDebug("missing: " + missing) if (missing.isEmpty) {//所有的parent已经提交 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get)//如果所有的父stage都完成,就可以将其提交到task中了。 } else { for (parent <- missing) { submitStage(parent)//否则一定要坚持将parent stages都提交完毕。 } waitingStages += stage//将该stage放入待处理栈中 } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
最后在分析一下submitMissingTask:
主要工作有:
1、确定stages需要计算的分区的id;
2、开启新的stage;
3、创建分区id与任务位置信息的map;
4、标记新的stageAttempt;
5、对stage序列化并广播;
6、对stages每个分区创建task,最后汇集为taskSet;
7、taskScheduler.submitTasks()提交task
private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // 需要计算的partitions的id" val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // 与该stage相关的Job的属性 val properties = jobIdToActiveJob(jobId).properties runningStages += stage//加入正在运行的stages中 / stage match {//根据stages的不同,调用stageStart,启动stage case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinat c7c1 or.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } //获取stages中每个rdd的每个分区的位置信息 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // If there are tasks to execute, record the submission time of the stage. Otherwise, // post the even without the submission time, which indicates that this stage was // skipped. if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } // stage的每个分区构造task val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } if (tasks.size > 0) {//如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成 logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) submitWaitingChildStages(stage) } }
3 stage划分总结
首先,action操作将会触发计算,向DAGScheduler提交作业;DAGScheduler收到之后,会首先从RDD的依赖链的末端处的RDD,遍历整个RDD的dependences;
当某个RDD的dependences中出现shuffle依赖之后,该RDD将会作为本stage的输入信息,并以此构建新的stage。
然后,得到的包含多个stage的Stage集合,其中直接触发job开始的则作为FinalStage,并生成一个job实例。
提交stage的时候,会先判断其父stage的结果能否使用,能够则提交,不能则将stage放入waitingstage中的。
如果一个中间过程stage的任务完成以后,DAGScheduler会检查所有的任务是否都完成了,重新扫描waitingstages中的stage,直至他们都没有没有完成的stage为止,就可以提交了。
相关文章推荐
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- Spark中job、stage、task的划分+源码执行过程分析
- Spark技术内幕:Stage划分及提交源码分析
- Spark技术内幕:Stage划分及提交源码分析
- spark源码分析之DAGScheduler提交作业(job)过程、stage阶段说明
- Spark源码解读-JOB的提交与执行
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark源码解读之Stage划分和提交
- Spark技术内幕:Stage划分及提交源码分析