您的位置:首页 > 其它

Spark源码分析之job提交后转换为Stage

2017-09-12 17:04 549 查看
在上一讲中,我们已经分析了Driver和Executor启动的整个过程,接下来,这个集群可能就需要进行相应的算子的计算了

在这里我们先普及一下相关知识

Job:在rdd的计算中每次遇到一个action操作,才会进行一次提交,这一次提交的就是一个job

stage:在一个job中,相邻的RDD之间如果是宽依赖就会进行划分,宽依赖的前后会构成两个不同的stage

taskSet:每一个stage在提交的时候,会被转换成一个taskSet进行提交,一个taskSet里面有若干个tasks

tasks:对一个分区的数据进行计算,被发送到executor上的最小的计算单元

本篇我们将会解析job的提交过程,以及提交后转换成stage的过程

假设我们现在的执行到了RDD的count方法

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum


在这里它会调用sparkContext对象的runJob方法,这个runJob方法是一个重载方法,最终调用的方法如下:

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}


在这里,我们可以看到接着调用了dagScheduler的runJob方法,接着往下走

def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//调用submitJobJ继续提交作业
val wa
4000
iter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
//这里会发生阻塞,直到返回作业完成或失败的结果
waiter.awaitResult() match {
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}


调用submitJob方法提交当前的job,提交后当前这个方法会被阻塞,直到作业运行结束,返回后,根据作业的成功与否打印出不同的日志信息

def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
//创建一个JobWaiter对象
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//借助内部消息处理把这个对象发送到DAGScheduler的DAGSchedulerEventLoop进行处理
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}


在这里我们遇到了一个陌生的东西eventProcessLoop,它是DAGSchedulerEventProcessLoop类的一个对象,在这里调用了它的post方法,这个方法其实是其父类EventLoop中的方法

def post(event: E): Unit = {
eventQueue.put(event)
}


在这个方法中将event放入eventQueue中,这实际上是一个BlockingQueue,同时EventLoop中有一个线程

private val eventThread = new Thread(name) {
//设置为守护线程,因为这线程的任务就是循环往复地从eventQueue中获取event,然后调用其消息处理方法
//因此必须设置为守护线程,当所有的用户线程都退出的时候,它也强迫被退出
setDaemon(true)

override def run(): Unit = {
try {
while (!stopped.get) {
//从eventQueue中拿取消息
val event = eventQueue.take()
try {
//调用onReceive方法处理获取到的消息
onReceive(event)
} catch {
case NonFatal(e) => {
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}


这个线程的主要任务就是从eventQueue中拿取消息,然后调用onReceive方法处理这个拿到的消息,在DAGSchedulerEventProcessLoop中这个将会实现这个onReceive方法,对发送过来的不同的消息进行不同的处理,在上面的submitJob里面我们看到其提交了一个JobSubmitted消息,下面我们来看一下这个消息的处理

[DAGSchedulerEventProcessLoop]
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case ...
case ...
}


在接受到这个消息后,执行dagScheduler.handleJobSubmitted方法

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.

//使用触发job的最后一个RDD,创建finalStage
//创建好对象后,将finalstage对象加入到内存缓存中
finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
if (finalStage != null) {
//使用finalStage创建一个Job,这个Job的最后一个stage当然就是我们的finalStage
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()

//将job加入到内存缓存中
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

//使用submitStage方法提交finalStage
submitStage(finalStage)
}
submitWaitingStages()
}


通过使用这个job的最后一个rdd创建一个ResultStage,在newResultStage的时候,其实就已经划分好了整个job的stage以及他们之间的依赖关系

private def newResultStage(
rdd: RDD[_],
numTasks: Int,
jobId: Int,
callSite: CallSite): ResultStage = {

//根据最后一个RDD,finalRDD获取parentStage,以及resultStage的id
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)

val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)

stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}


根据这个job的最后一个RDD,计算其parentStage

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}


private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
//getShuffleMapStage中会调用getAncestorShuffleDependencies(shuffleDep.rdd)
//在getAncestorShuffleDependencies方法中再次寻找当前这个rdd所在的stage的父stage
//循环往复,一直找到所有的父stage
parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}

//返回当前的stage的所有的父stage
parents.toList
}


定义一个存取RDD的栈

将作为参数传递过来的RDD压栈

循环地从栈中取出一个RDD,调用visit方法

在visit方法中

a.首先判断该RDD有没有被调用过这个函数,如果调用过就跳出

b.没有被调用过visit方法

i. 遍历这个RDD的依赖

ii.如果是窄依赖,那么就将这个依赖的RDD压入栈中

iii.如果是宽依赖,那么根据这个宽依赖创建一个shuffleMapStage,并将这个stage加入到parents中

总结:我们发现通过这个方法,我们可以将一个RDD所在的stage的所有的父stage都找出来,那么关键怎么找到父satge的父satge呢?下面看方法getShuffleMapStage

private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
//从数据结构shuffleToMapStage中根据shuffleId获取,如果有,则代表这个父阶段已经被注册过了
//那么就直接返回这个找到的stage就可以了
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, firstJobId)

// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)

//将构建好的shuffleMapStage加入到shuffleToMapStage中
// 如果下一次这个stage又成为了另一个stage的父stage,那么就不需要再次构建了,直接取出来就可以了
shuffleToMapStage(shuffleDep.shuffleId) = stage

stage
}
}


根据依赖和jobId得到shuffleMapStage时,第一步会shuffleToMapStage这个hashMap中根据shuffleId进行查找,如果找到的话,就代表这个ShuffleMapStage之前已经被创建过了,此时直接拿过来就可以了(因为在DAG中一个shuffleMapStage可能是好几个不同的stage的parentStage,这点很容易理解),但是如果没有找到的话,那么就会调用
registerShuffleDependencies(shuffleDep, firstJobId)
找到这个stage的祖先stage

private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffl
bec0
eId) = stage
}
}


调用
getAncestorShuffleDependencies
方法获取其祖先的stage

private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}

waitingForVisit.push(shufDep.rdd)
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}

waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents
}


我们可以看到这个函数的逻辑与
getParentStages
函数的逻辑是一样的,也是获取rdd的父stage,注意这个方法的参数是从getShuffleMapStage(shufDep)的这个shufDep的rdd,也就是说到这里位置,我们其实是像在进行一个递归调用,最终将会完成整个job中的rdd的stage的划分,以及这些stage之间的依赖

现在我们回到
handleJobSubmitted
方法,再创建好resultStage后,我们将会根据这个resultStage床架一个job,然后将构建好的job存到数据结构中,然后我们使用submitStage(finalStage)提交stage

private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//获取当前stage的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)

//只有遇到最初的那个stage的时候才会进行提交,其余的stages都在waitingStages里面
//如果当前的stage的前面再也没有stage了,那么就可以提交了,否则还需要划分missing中的stage
if (missing.isEmpty) {
//如果当前的stage没有父stage了,那么就提交当前的stage
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
//否则对于当前的satge还需要递归调用submitStage,用来划分它的最后一个RDD的窄依赖还是宽依赖
//这边相当于一个深度遍历图
for (parent <- missing) {
submitStage(parent)
}
//最后按照倒序的方式(因为这个方法在递归主体的后面),将stage将入到waitingStages中
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}


先获取当前stage的父stage,其实getMissingParentStages与getParnetStage的逻辑是一样的,这里我们就不赘述了,如果当前stage没有父stage了,也就意味这当前的这个stage是这个job的第一个stage,那么我们就调用submitMissingTasks提交任务,否则就递归submitStage(parent),还会将当前这个stage加入到waitingStages中,最后的结果就是stage按照执行的先后顺序存储带waitingstages中,并且会对于第一个stage调用submitMissingTasks方法,那么waitingStages中的stage何时提交呢?

我们可以看到在handleJobSubmitted这个方法中,submitStage调用后,会调用
submitWaitingStages
方法

private def submitWaitingStages() {
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
logTrace("Checking for newly runnable parent stages")
logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray
waitingStages.clear()
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}


可以看到在这里我们将会提交后续的stage,并且由于waitingStages中的stage已经是有序的了,此处只要顺序提交就可以了

这里就是我们提交job并转化成stage的全过程,总结如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: