Spark调度机制:5)任务调度
2016-11-18 16:59
447 查看
在DAG调度器完成了阶段划分的工作后,并把任务集交给任务调度器,接下来将深入到任务调度器内部,观察任务集已经任务的调度与执行过程
ResultTask的代码:
submitMissingTasks会将阶段末的RDD序列化,如何封装成广播变量,供之后的每一个任务使用,从而保证任务之间相互孤立
跳过部分细节,下面是任务被创建的部分
1.任务分类与执行
在Spark中,Task进一步被划分为Result Task和ShuffleMap Task,顾名思义,Result Task就是Result Stage内部被调度器划分得到的任务,而ShuffleMap Task则是ShuffleMap Stage内部被调度器划分的任务。源码实现中,一个并行任务对于实现抽象类Task的runTask函数。ResultTask的代码:
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) }ShuffleMap Task的代码:
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { //omission:some other code... }可以看到,无论如何,ResultTask和ShuffleMapTask类都会调用RDD的iterator函数来获取段末RDD的数据(具体参看:Spark核心RDD:计算函数),对于ResultTask,得到的分区数据会被传给func处理,而对ShuffleMapTask,得到分区的数据则是交给ShuffleMap任务,得到分区数据则是交付给Shuffle相关的模块出处理,相关过程在后面的章节中讲述。
2.任务划分和提交
在之前的博客Spark调度机制:3)DAG调度中提及到的DAGScheduler.submitMissingTasks,其部分代码如下//对应的Stage会调用不同的findMissingPartitions val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() /**ResultStage:Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ override def findMissingPartitions(): Seq[Int] = { val job = activeJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } override def findMissingPartitions(): Seq[Int] = { val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty) assert(missing.size == numPartitions - _numAvailableOutputs, s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}") missing }submitMissingTasks方法会确定任务的个数,无论是Result阶段还是Shuffle阶段,生成的任务个数总与末阶段的RDD分区个数相同(若考虑到部分任务运行失败的情况,则任务个数需要减去其中已经被计算过的分区个数)
submitMissingTasks会将阶段末的RDD序列化,如何封装成广播变量,供之后的每一个任务使用,从而保证任务之间相互孤立
var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() case stage: ResultStage => closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) } catch { //..... }
跳过部分细节,下面是任务被创建的部分
val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators) } case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } } } catch { //..... } if (tasks.size > 0) { //.... taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) //........ }可以看到根据阶段类型的不同,并考虑到任务的执行位置尽可能跟数据处于同一个节点,最终会生成一些列的Result任务和ShuffleMap任务。同一个阶段的任务会被封装成任务集(Task Set),并调用TaskSheduler类的submitTasks函数提交任务给TaskSheduler去调度和执行。
相关文章推荐
- Spark2.2 任务调度机制schedule()源码剖析
- 【Spark工作机制详解】调度与任务分配
- spark2.2以后版本任务调度将增加黑名单机制
- Spark工作机制-调度与任务分配
- Windows的任务调度机制
- Spark核心作业调度和任务调度之DAGScheduler源码
- SQL Server OS的任务调度机制
- Spark调度机制:3)DAG调度
- spark 核心作业调度和任务调度
- 2、uc/os:ucos-ii的任务调度机制
- spark源码之Job执行(2)任务调度taskscheduler
- Spark任务调度流程
- freertos内核走读2——task任务调度机制(三)
- 面试:进程调度的任务、机制和方式
- Windows的任务调度机制
- Quartz 任务调度机制笔记——01
- 深入Spark内核:任务调度(3)-TaskScheduler
- 关于Linux任务调度的机制?
- spark源码之Job执行(2)任务调度taskscheduler
- spark任务调度(一)