您的位置:首页 > 其它

Spark调度机制:5)任务调度

2016-11-18 16:59 447 查看
在DAG调度器完成了阶段划分的工作后,并把任务集交给任务调度器,接下来将深入到任务调度器内部,观察任务集已经任务的调度与执行过程

1.任务分类与执行

在Spark中,Task进一步被划分为Result Task和ShuffleMap Task,顾名思义,Result Task就是Result Stage内部被调度器划分得到的任务,而ShuffleMap Task则是ShuffleMap Stage内部被调度器划分的任务。源码实现中,一个并行任务对于实现抽象类Task的runTask函数。

ResultTask的代码:

override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
ShuffleMap Task的代码:

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
//omission:some other code...
}
可以看到,无论如何,ResultTask和ShuffleMapTask类都会调用RDD的iterator函数来获取段末RDD的数据(具体参看:Spark核心RDD:计算函数),对于ResultTask,得到的分区数据会被传给func处理,而对ShuffleMapTask,得到分区的数据则是交给ShuffleMap任务,得到分区数据则是交付给Shuffle相关的模块出处理,相关过程在后面的章节中讲述。

 2.任务划分和提交

在之前的博客Spark调度机制:3)DAG调度中提及到的DAGScheduler.submitMissingTasks,其部分代码如下
//对应的Stage会调用不同的findMissingPartitions
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

/**ResultStage:Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
}

override def findMissingPartitions(): Seq[Int] = {
val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}
submitMissingTasks方法会确定任务的个数,无论是Result阶段还是Shuffle阶段,生成的任务个数总与末阶段的RDD分区个数相同(若考虑到部分任务运行失败的情况,则任务个数需要减去其中已经被计算过的分区个数)

submitMissingTasks会将阶段末的RDD序列化,如何封装成广播变量,供之后的每一个任务使用,从而保证任务之间相互孤立

var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
case stage: ResultStage =>
closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
}

taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
//.....
}


跳过部分细节,下面是任务被创建的部分

val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.internalAccumulators)
}

case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, stage.internalAccumulators)
}
}
} catch {
//.....
}
if (tasks.size > 0) {
//....
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
//........
}
可以看到根据阶段类型的不同,并考虑到任务的执行位置尽可能跟数据处于同一个节点,最终会生成一些列的Result任务和ShuffleMap任务。同一个阶段的任务会被封装成任务集(Task Set),并调用TaskSheduler类的submitTasks函数提交任务给TaskSheduler去调度和执行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: