您的位置:首页 > 其它

spark源码之Job执行(1)stage划分与提交

2017-06-12 21:59 537 查看

1 从reduce看Job执行流程

1.1 reduce操作

以reduce操作为例,看看作业执行的流程

def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}


reduce两个方法具体作用之前总结了。

最后reduce中调用sc.runJob(this, reducePartition, mergeResult),执行reducePartition和mergeResult两个方法。

那么runJob做了什么呢?

现在进入sparkContext域中的runJob:

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}


runJob输入参数:

rdd,目标RDD;

func,在RDD每个分区上执行的函数;

partitions,有些job不会计算目标RDD上的所有partitions。

resultHandler,每个分区的结果写入这个callback函数

2.1 dagScheduler管理

之后,Job将由dagScheduler管理,在dagScheduler的runJob方法中,提交job,又会进入submitJob

在此之前,先看看这个dagScheduler到底有些什么属性

2.1.1 dagScheduler与stage相关属性

首先是各个部分的ID的设置:

private[scheduler] val nextJobId = new AtomicInteger(0)//jobId初始化为0
private[scheduler] def numTotalJobs: Int = nextJobId.get()//jobId跟job个数相等,有多少编号就多少
private val nextStageId = new AtomicInteger(0)//stageId

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]//一个job可以对应多个stage
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]


然后是几个存储stage的hash表:

private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]//有shuffle操作的map的stage划分结果
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]//正在执行的job

// 存储那些父辈还没有执行的stage
private[scheduler] val waitingStages = new HashSet[Stage]

// 正在执行的stage
private[scheduler] val runningStages = new HashSet[Stage]


2.1.2 submitJob

dagScheduler的runJob实际调用submitJob方法

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)


submitJob将向scheduler提交这个action:

val jobId = nextJobId.getAndIncrement()//新增Job,id +1s
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]//与task关联
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}


2.1.3 DAGSchedulerEventProcessLoop

之后程序进入DAGSchedulerEventProcessLoop处理环节:

先看eventProcessLoop.post:

def post(event: E): Unit = {
eventQueue.put(event)
}


eventQueue是一个event队列,现在就是将event入队

一旦入队,doOnReceive将会处理event。因为之前传入是JobSubmitted对象,那么根据匹配执行handleJobSubmitted

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

.......#此处省略一万字


所以handleJobSubmitted才是stages划分和提交的关键所在。

2.2 handleJobSubmitted:

首先是完整的handleJobSubmit:

private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
"//获取最后一个stages
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)//表示一个在DAG中运行的Job
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)//提交finalStage
}


在这个方法中,将得到finalstage:

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

//


进入createResultStage

private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)//获取父stages"
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}


这里流程进入一个关键函数中:

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}


首先是getOrCreateParentStages,以当前的rdd和jobid作为参数,返回一个List(parentStage,id),前面的代表这个当前的resultStage所依赖的全部stage,后面的就是返回当前stage的id

private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]/#父依赖hashset
val visited = new HashSet[RDD[_]]#访问过的
val waitingForVisit = new Stack[RDD[_]]//
waitingForVisit.push(rdd)//待访问的rdd首先压入栈
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)//这个怎么办?
}
}
}
parents
}


根据入栈的rdd的依赖关系,

如果是窄依赖,那么就将压入栈底;

如果是宽依赖,那么就将至添加到parents中。最后结果将返回parents。也就是parents中是宽依赖的rdd关系。

这样之后得到最终的stage结果。

private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {     #如果原本shuffleIdToMapStage中就有ShuffleMapStage ,直接返回
case Some(stage) =>
stage
#如果没有,调用getMissingAncestorShuffleDependencies找到祖先的宽依赖
case None =>              getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>

if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}


追寻stages的父辈,如果父辈stages为missing的状态,要么已经没有父辈了,要么父辈已经都提交了:

private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]//尚未提交的父stages
val visited = new HashSet[RDD[_]]//已经处理过的RDD
// 未处理的将存入这个栈
val waitingForVisit = new Stack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {//如果dep是宽依赖就可以直接产生stage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖,将rdd存入栈中,这个栈中的rdd都是窄依赖
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)//压入第一个rdd,然后递归遍历整个stage中rdd,寻找其父依赖,直至最开始rdd
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList//父依赖返回list
}


最终返回ShuffleMapStage。

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length//每个分区对应一个task
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)

if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {

val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)// 根据shuffleId从mapOutputTracker中获取序列化的多个MapOutputStatus对象
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)//反序列化
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}


这样getOrCreateParentStages就得到了该rdd的父依赖stage的List。

ResultStage将返回最终的jobId绑定的stage结果,最终将赋值给finalStage。

之后就是根据划分的stage激活job:

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()


然后提交该job的最后stage:

submitStage(finalStage)


private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)//根据stage获取JobID
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
//没有等待父stage,没有正在运行,且没有失败的情况下退出。
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)//获取stage还未提交的parent
logDebug("missing: " + missing)
if (missing.isEmpty) {//所有的parent已经提交
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)//如果所有的父stage都完成,就可以将其提交到task中了。
} else {
for (parent <- missing) {
submitStage(parent)//否则一定要坚持将parent stages都提交完毕。
}
waitingStages += stage//将该stage放入待处理栈中
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}


最后在分析一下submitMissingTask:

主要工作有:

1、确定stages需要计算的分区的id;

2、开启新的stage;

3、创建分区id与任务位置信息的map;

4、标记新的stageAttempt;

5、对stage序列化并广播;

6、对stages每个分区创建task,最后汇集为taskSet;

7、taskScheduler.submitTasks()提交task

private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")

// 需要计算的partitions的id"
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

// 与该stage相关的Job的属性
val properties = jobIdToActiveJob(jobId).properties

runningStages += stage//加入正在运行的stages中
/
stage match {//根据stages的不同,调用stageStart,启动stage
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinat
c7c1
or.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
//获取stages中每个rdd的每个分区的位置信息
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)

// If there are tasks to execute, record the submission time of the stage. Otherwise,
// post the even without the submission time, which indicates that this stage was
// skipped.
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage

// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// stage的每个分区构造task
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

if (tasks.size > 0) {//如果存在tasks,则利用taskScheduler.submitTasks()提交task,否则标记stage已完成
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)

submitWaitingChildStages(stage)
}
}


3 stage划分总结

首先,action操作将会触发计算,向DAGScheduler提交作业;

DAGScheduler收到之后,会首先从RDD的依赖链的末端处的RDD,遍历整个RDD的dependences;

当某个RDD的dependences中出现shuffle依赖之后,该RDD将会作为本stage的输入信息,并以此构建新的stage。

然后,得到的包含多个stage的Stage集合,其中直接触发job开始的则作为FinalStage,并生成一个job实例。

提交stage的时候,会先判断其父stage的结果能否使用,能够则提交,不能则将stage放入waitingstage中的。

如果一个中间过程stage的任务完成以后,DAGScheduler会检查所有的任务是否都完成了,重新扫描waitingstages中的stage,直至他们都没有没有完成的stage为止,就可以提交了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: