Spark源码分析之job提交后转换为Stage
2017-09-12 17:04
549 查看
在上一讲中,我们已经分析了Driver和Executor启动的整个过程,接下来,这个集群可能就需要进行相应的算子的计算了
在这里我们先普及一下相关知识
Job:在rdd的计算中每次遇到一个action操作,才会进行一次提交,这一次提交的就是一个job
stage:在一个job中,相邻的RDD之间如果是宽依赖就会进行划分,宽依赖的前后会构成两个不同的stage
taskSet:每一个stage在提交的时候,会被转换成一个taskSet进行提交,一个taskSet里面有若干个tasks
tasks:对一个分区的数据进行计算,被发送到executor上的最小的计算单元
本篇我们将会解析job的提交过程,以及提交后转换成stage的过程
假设我们现在的执行到了RDD的count方法
在这里它会调用sparkContext对象的runJob方法,这个runJob方法是一个重载方法,最终调用的方法如下:
在这里,我们可以看到接着调用了dagScheduler的runJob方法,接着往下走
调用submitJob方法提交当前的job,提交后当前这个方法会被阻塞,直到作业运行结束,返回后,根据作业的成功与否打印出不同的日志信息
在这里我们遇到了一个陌生的东西eventProcessLoop,它是DAGSchedulerEventProcessLoop类的一个对象,在这里调用了它的post方法,这个方法其实是其父类EventLoop中的方法
在这个方法中将event放入eventQueue中,这实际上是一个BlockingQueue,同时EventLoop中有一个线程
这个线程的主要任务就是从eventQueue中拿取消息,然后调用onReceive方法处理这个拿到的消息,在DAGSchedulerEventProcessLoop中这个将会实现这个onReceive方法,对发送过来的不同的消息进行不同的处理,在上面的submitJob里面我们看到其提交了一个JobSubmitted消息,下面我们来看一下这个消息的处理
在接受到这个消息后,执行dagScheduler.handleJobSubmitted方法
通过使用这个job的最后一个rdd创建一个ResultStage,在newResultStage的时候,其实就已经划分好了整个job的stage以及他们之间的依赖关系
根据这个job的最后一个RDD,计算其parentStage
定义一个存取RDD的栈
将作为参数传递过来的RDD压栈
循环地从栈中取出一个RDD,调用visit方法
在visit方法中
a.首先判断该RDD有没有被调用过这个函数,如果调用过就跳出
b.没有被调用过visit方法
i. 遍历这个RDD的依赖
ii.如果是窄依赖,那么就将这个依赖的RDD压入栈中
iii.如果是宽依赖,那么根据这个宽依赖创建一个shuffleMapStage,并将这个stage加入到parents中
总结:我们发现通过这个方法,我们可以将一个RDD所在的stage的所有的父stage都找出来,那么关键怎么找到父satge的父satge呢?下面看方法getShuffleMapStage
根据依赖和jobId得到shuffleMapStage时,第一步会shuffleToMapStage这个hashMap中根据shuffleId进行查找,如果找到的话,就代表这个ShuffleMapStage之前已经被创建过了,此时直接拿过来就可以了(因为在DAG中一个shuffleMapStage可能是好几个不同的stage的parentStage,这点很容易理解),但是如果没有找到的话,那么就会调用
调用
我们可以看到这个函数的逻辑与
现在我们回到
先获取当前stage的父stage,其实getMissingParentStages与getParnetStage的逻辑是一样的,这里我们就不赘述了,如果当前stage没有父stage了,也就意味这当前的这个stage是这个job的第一个stage,那么我们就调用submitMissingTasks提交任务,否则就递归submitStage(parent),还会将当前这个stage加入到waitingStages中,最后的结果就是stage按照执行的先后顺序存储带waitingstages中,并且会对于第一个stage调用submitMissingTasks方法,那么waitingStages中的stage何时提交呢?
我们可以看到在handleJobSubmitted这个方法中,submitStage调用后,会调用
可以看到在这里我们将会提交后续的stage,并且由于waitingStages中的stage已经是有序的了,此处只要顺序提交就可以了
这里就是我们提交job并转化成stage的全过程,总结如下:
在这里我们先普及一下相关知识
Job:在rdd的计算中每次遇到一个action操作,才会进行一次提交,这一次提交的就是一个job
stage:在一个job中,相邻的RDD之间如果是宽依赖就会进行划分,宽依赖的前后会构成两个不同的stage
taskSet:每一个stage在提交的时候,会被转换成一个taskSet进行提交,一个taskSet里面有若干个tasks
tasks:对一个分区的数据进行计算,被发送到executor上的最小的计算单元
本篇我们将会解析job的提交过程,以及提交后转换成stage的过程
假设我们现在的执行到了RDD的count方法
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
在这里它会调用sparkContext对象的runJob方法,这个runJob方法是一个重载方法,最终调用的方法如下:
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }
在这里,我们可以看到接着调用了dagScheduler的runJob方法,接着往下走
def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime //调用submitJobJ继续提交作业 val wa 4000 iter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) //这里会发生阻塞,直到返回作业完成或失败的结果 waiter.awaitResult() match { case JobSucceeded => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case JobFailed(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }
调用submitJob方法提交当前的job,提交后当前这个方法会被阻塞,直到作业运行结束,返回后,根据作业的成功与否打印出不同的日志信息
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] //创建一个JobWaiter对象 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) //借助内部消息处理把这个对象发送到DAGScheduler的DAGSchedulerEventLoop进行处理 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }
在这里我们遇到了一个陌生的东西eventProcessLoop,它是DAGSchedulerEventProcessLoop类的一个对象,在这里调用了它的post方法,这个方法其实是其父类EventLoop中的方法
def post(event: E): Unit = { eventQueue.put(event) }
在这个方法中将event放入eventQueue中,这实际上是一个BlockingQueue,同时EventLoop中有一个线程
private val eventThread = new Thread(name) { //设置为守护线程,因为这线程的任务就是循环往复地从eventQueue中获取event,然后调用其消息处理方法 //因此必须设置为守护线程,当所有的用户线程都退出的时候,它也强迫被退出 setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { //从eventQueue中拿取消息 val event = eventQueue.take() try { //调用onReceive方法处理获取到的消息 onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } }
这个线程的主要任务就是从eventQueue中拿取消息,然后调用onReceive方法处理这个拿到的消息,在DAGSchedulerEventProcessLoop中这个将会实现这个onReceive方法,对发送过来的不同的消息进行不同的处理,在上面的submitJob里面我们看到其提交了一个JobSubmitted消息,下面我们来看一下这个消息的处理
[DAGSchedulerEventProcessLoop] private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case ... case ... }
在接受到这个消息后,执行dagScheduler.handleJobSubmitted方法
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. //使用触发job的最后一个RDD,创建finalStage //创建好对象后,将finalstage对象加入到内存缓存中 finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } if (finalStage != null) { //使用finalStage创建一个Job,这个Job的最后一个stage当然就是我们的finalStage val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() //将job加入到内存缓存中 jobIdToActiveJob(jobId) = job activeJobs += job finalStage.resultOfJob = Some(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //使用submitStage方法提交finalStage submitStage(finalStage) } submitWaitingStages() }
通过使用这个job的最后一个rdd创建一个ResultStage,在newResultStage的时候,其实就已经划分好了整个job的stage以及他们之间的依赖关系
private def newResultStage( rdd: RDD[_], numTasks: Int, jobId: Int, callSite: CallSite): ResultStage = { //根据最后一个RDD,finalRDD获取parentStage,以及resultStage的id val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
根据这个job的最后一个RDD,计算其parentStage
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = { val parentStages = getParentStages(rdd, firstJobId) val id = nextStageId.getAndIncrement() (parentStages, id) }
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => //getShuffleMapStage中会调用getAncestorShuffleDependencies(shuffleDep.rdd) //在getAncestorShuffleDependencies方法中再次寻找当前这个rdd所在的stage的父stage //循环往复,一直找到所有的父stage parents += getShuffleMapStage(shufDep, firstJobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } //返回当前的stage的所有的父stage parents.toList }
定义一个存取RDD的栈
将作为参数传递过来的RDD压栈
循环地从栈中取出一个RDD,调用visit方法
在visit方法中
a.首先判断该RDD有没有被调用过这个函数,如果调用过就跳出
b.没有被调用过visit方法
i. 遍历这个RDD的依赖
ii.如果是窄依赖,那么就将这个依赖的RDD压入栈中
iii.如果是宽依赖,那么根据这个宽依赖创建一个shuffleMapStage,并将这个stage加入到parents中
总结:我们发现通过这个方法,我们可以将一个RDD所在的stage的所有的父stage都找出来,那么关键怎么找到父satge的父satge呢?下面看方法getShuffleMapStage
private def getShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { //从数据结构shuffleToMapStage中根据shuffleId获取,如果有,则代表这个父阶段已经被注册过了 //那么就直接返回这个找到的stage就可以了 case Some(stage) => stage case None => // We are going to register ancestor shuffle dependencies registerShuffleDependencies(shuffleDep, firstJobId) // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) //将构建好的shuffleMapStage加入到shuffleToMapStage中 // 如果下一次这个stage又成为了另一个stage的父stage,那么就不需要再次构建了,直接取出来就可以了 shuffleToMapStage(shuffleDep.shuffleId) = stage stage } }
根据依赖和jobId得到shuffleMapStage时,第一步会shuffleToMapStage这个hashMap中根据shuffleId进行查找,如果找到的话,就代表这个ShuffleMapStage之前已经被创建过了,此时直接拿过来就可以了(因为在DAG中一个shuffleMapStage可能是好几个不同的stage的parentStage,这点很容易理解),但是如果没有找到的话,那么就会调用
registerShuffleDependencies(shuffleDep, firstJobId)找到这个stage的祖先stage
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) { val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) while (parentsWithNoMapStage.nonEmpty) { val currentShufDep = parentsWithNoMapStage.pop() val stage = newOrUsedShuffleStage(currentShufDep, firstJobId) shuffleToMapStage(currentShufDep.shuffl bec0 eId) = stage } }
调用
getAncestorShuffleDependencies方法获取其祖先的stage
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => if (!shuffleToMapStage.contains(shufDep.shuffleId)) { parents.push(shufDep) } waitingForVisit.push(shufDep.rdd) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } parents }
我们可以看到这个函数的逻辑与
getParentStages函数的逻辑是一样的,也是获取rdd的父stage,注意这个方法的参数是从getShuffleMapStage(shufDep)的这个shufDep的rdd,也就是说到这里位置,我们其实是像在进行一个递归调用,最终将会完成整个job中的rdd的stage的划分,以及这些stage之间的依赖
现在我们回到
handleJobSubmitted方法,再创建好resultStage后,我们将会根据这个resultStage床架一个job,然后将构建好的job存到数据结构中,然后我们使用submitStage(finalStage)提交stage
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //获取当前stage的父stage val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) //只有遇到最初的那个stage的时候才会进行提交,其余的stages都在waitingStages里面 //如果当前的stage的前面再也没有stage了,那么就可以提交了,否则还需要划分missing中的stage if (missing.isEmpty) { //如果当前的stage没有父stage了,那么就提交当前的stage logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { //否则对于当前的satge还需要递归调用submitStage,用来划分它的最后一个RDD的窄依赖还是宽依赖 //这边相当于一个深度遍历图 for (parent <- missing) { submitStage(parent) } //最后按照倒序的方式(因为这个方法在递归主体的后面),将stage将入到waitingStages中 waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
先获取当前stage的父stage,其实getMissingParentStages与getParnetStage的逻辑是一样的,这里我们就不赘述了,如果当前stage没有父stage了,也就意味这当前的这个stage是这个job的第一个stage,那么我们就调用submitMissingTasks提交任务,否则就递归submitStage(parent),还会将当前这个stage加入到waitingStages中,最后的结果就是stage按照执行的先后顺序存储带waitingstages中,并且会对于第一个stage调用submitMissingTasks方法,那么waitingStages中的stage何时提交呢?
我们可以看到在handleJobSubmitted这个方法中,submitStage调用后,会调用
submitWaitingStages方法
private def submitWaitingStages() { // TODO: We might want to run this less often, when we are sure that something has become // runnable that wasn't before. logTrace("Checking for newly runnable parent stages") logTrace("running: " + runningStages) logTrace("waiting: " + waitingStages) logTrace("failed: " + failedStages) val waitingStagesCopy = waitingStages.toArray waitingStages.clear() for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { submitStage(stage) } }
可以看到在这里我们将会提交后续的stage,并且由于waitingStages中的stage已经是有序的了,此处只要顺序提交就可以了
这里就是我们提交job并转化成stage的全过程,总结如下:
相关文章推荐
- spark源码分析之DAGScheduler提交作业(job)过程、stage阶段说明
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- Spark源码分析之四:Stage提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- spark源码之Job执行(1)stage划分与提交
- Spark技术内幕:Stage划分及提交源码分析
- spark源码之Job执行(1)stage划分与提交
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark技术内幕:Stage划分及提交源码分析
- Spark中job、stage、task的划分+源码执行过程分析
- spark源码之Job执行(1)stage划分与提交
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- spark源码之Job执行(1)stage划分与提交
- Spark源码分析之Job提交运行总流程概述
- spark源码之Job执行(1)stage划分与提交
- spark源码学习(三):job的提交以及runJob函数的分析
- Spark技术内幕:Stage划分及提交源码分析
- Spark技术内幕:Stage划分及提交源码分析