您的位置:首页 > 其它

Spark源码阅读笔记:TaskScheduler

2014-10-21 15:34 423 查看


前言

前面两篇分别大概介绍了一下Spark集群中几个角色和driver启动流程。那么本篇将以前面两篇文档中的线索为起点,来分析一下SparkContext中,同时也是driver
program甚至整个Spark应用流程中最为重要的组件之一——TaskSchedular,代码啥的我就尽量不贴了,有需要的同学对着源码看吧,以后其他文档也是这个路数,说一下代码在哪就行,自己找着看更全面也更好玩...

创建及初始化

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)


本文的入口在SparkContext中创建TaskScheduler步骤。通过SparkContext中的createTaskScheduler工厂方法来创建一个TaskScheduler,进入这个方法,可以看到,TaskScheduler的主要依据是master的前缀来决定具体的构建目标(我这里以Spark的standalone模式为线索,所以只跟踪以spark://为前缀的模式匹配中的创建过程,下同...)。

Standalone模式下的构建代码十分简单:

val scheduler = new TaskSchedulerImpl(sc)

val masterUrls = sparkUrl.split(",").map("spark://" + _)

val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)

scheduler.initialize(backend)

scheduler


先new一个TaskSchedulerImpl对象,然后以master的地址为参数,创建一个SparkDeploySchedulerBackend对象,最后将这个backend对象送入到scheduler中执行初始化动作。进到scheduler的initialize方法,除了将传入的backend对象赋给内部成员以外,还试图构建一个root
pool,这里已经可以体会到Spark中application内部资源调度系统里池(pool)的概念了。

我们来深入了解一下spark中pool这个东西:其实前面讲到application内部资源调度问题的时候,在提到FAIR调度时就已经有说过pool这个概念了,当时有提到,FAIR调度器支持将不同的job按pool进行划分,通过对不同的pool进行不同的配置来决定调度策略。由此可见,spark中application内部的调度体系中最粗粒度的对象就是pool,其次是job,再次是stage,也就是taskset。在TaskScheduler内部,以一棵树的形式将这些对象组织起来:初始化的时候创建rootPool作为树的根,每个pool都是树上的非叶子节点,树的叶子是被称为TaskSetManager的角色(TaskSetManager是Schedulable的示例,pool是Pool类的实例,Pool类继承于Schedulable,所以本质上来说这棵数就是一棵Schedulable树;另外,与其说manager是叶子对象,实际上是被以一个FIFO队列的形式组织在一起,这就是为什么前面讲内部资源调度时提到FAIR模式下,每个pool内部taskset还是以FIFO形式来执行的原因),用于管理Taskset。当创建一个pool时,就意味着增加了一个非叶子节点;而提交一个stage,就意味着增加了一个叶子。

TaskScheduler通过SchedulableBuilder来构建这棵Schedulable树,目前支持FAIR和FIFO两种(即App内部资源调度的两种方式)。FIFO由于不支持pool级的资源调度,所以所有stage直接挂在根节点上,其buildPools方法为空方法;FAIR支持pool级资源调度,所以在初始化阶段会读入pool配置文件,并构建一个default
pool(这两个概念可以参考前面的文档)。

当初始化完这棵树后,TaskScheduler流程宣告完毕。

启动

在讲启动流程之前先介绍一下SchedulerBackend这个组件。字面上理解,这个组件就是TaskScheduler的后台部分,那么这个后台部分是用来干嘛的捏?之前有阐述过Spark中executor相关的一些介绍,大体上来说一个Spark
app启动后,就为其在各个工作节点上创建executor,这些executor为这个app所独占,然后app的各种task以及保存数据都在这些executor上。

显而易见TaskScheduler作为任务调度器,很大程度上一个任务就是把提交上来的任务丢到executor上去,那么backend的一个重要工作就是在job的运行过程中维护和管理executor的连接。更详细的相关介绍穿插在后面的流程中。

TaskScheduler的start只有两步:调用backend的start以及启动周期性的推测执行检查。先来看backend的start方法:

val properties = new ArrayBuffer[(String, String)]

for ((key, value) <- scheduler.sc.conf.getAll) {

if (key.startsWith("spark.")) {

properties += ((key, value))

}

}

// TODO (prashant) send conf instead of properties

driverActor = actorSystem.actorOf(

Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)


依然十分干净的两步:从SparkContext中载入配置信息,然后启动driverActor(由此可以很明白地看到,backend除了一些对外接口以外最为核心的就是这个driverActor,一个运行着的线程)。DriverActor是定义在内部的一个Actor类,从字面上就可以看出driverActor当然是driver
program的核心啦~^_^,里面不急处理很多核心消息,例如executor,资源分配等,同时还管理着executor上可用资源的信息。

推测执行相关内容在后面再作分析。

搞完这些以后,在SparkDeploySchedulerBackend的start方法中,在工作节点上启动executor进程。然后,创建一个AppClient对象,并调用其start方法。这个AppClient对象是与spark集群进行交互的一个接口,提供一组监听方法接口,监听集群事件,比如application的连接、断开,executor的增加、移除等。

任务提交

对于TaskScheduler各种文档中提得最多的就是任务提交这个流程。通过调用action到SparkContext的runjob,然后DAGScheduler计算完stage后交给TaskScheduler,本文只讨论TaskScheduler部分,其他部分以后再说。通过跟踪这个流程的代码,最终DAGScheduler调用TaskScheduler的submitTasks方法提交任务,那么我们这部分的代码分析就从这个入口开始。

submitTasks接收的是经过DAGScheduler预处理过的TaskSet,依照前文提到过的Schedulable树,这里首先为这个taskset创建一个TaskSetManager,然后调用schedulableBuilder的addTaskSetManager方法将这个manager加到调度树上(某个pool下的fifo队列)。之后启动一个周期性检查任务加载是否成功的线程,如果超时就干掉。最后调用backend的reviveOffers方法。

reviveOffers方法本身只是向driverActor传递一个ReviveOffers的消息。driverActer在收到这个消息后,先调用taskscheduler的resourceOffers方法告知资源已经分配。资源分配是cluster
manager的事情,resourceOffers方法被用来通知driver
program资源分配完毕。由于这里只分析standalone方式,所以这个过程十分简单,直接在driverActor将可用executor信息传给taskscheduler。那么都猜到了,resourceOffers这个方法一定是要安排task到底在哪里执行了。

resourceOffers内,首先管理和更新一下内部的executor信息,然后对driverActor传过来的列表做一次shuffle来避免每次task都被丢到相同的executor上。

val shuffledOffers = Random.shuffle(offers).

val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))

val availableCpus = shuffledOffers.map(o => o.cores).toArray

val sortedTaskSets = rootPool.getSortedTaskSetQueue


接着创建一个task数组列表,用来记录executor运行着的task(这里可以看到数组长度为executor可用核数,可以确定task的资源分配是基于core的数量,默认1对1,所以executor上最大任务数不会超过其可用核数)。然后创建一个可用cpu列表,最终获取调度树pool上待执行的taskset集。

在taskScheduler调度任务时也会利用延迟调度算法实现本地性,其中本地性共有五个级别,分别是PROCESS_LOCAL、 NODE_LOCAL 、 NO_PREF 、 RACK_LOCAL、ANY。在初始化完所需要用到的列表后,就开始执行分配任务到executor的流程,在这里代码的本地性执行会作为重要的考虑依据。

之后的操作是将资源分配信息绑到task中的过程,并返回给driverActor。driverActor在得到一组已经完成资源分配的task后,先在内部的executor空闲CPU信息中去掉已经被分配掉的core,然后把经过序列化后的task信息发送给executor,真正开始执行。

任务完成

任务完成后,driverActor收到executor发来的StatusUpdate消息,然后通知taskScheduler,如果任务执行完成,则在内部的空闲cpu集中增加完成任务执行的core。

通知taskScheduler的方法调用堆栈如下:

-TaskSchedulerImpl.statusUpdate

-如果是executor异常,报告DAGScheduler开始错误处理

-如果执行成功:TaskResultGetter.enqueueSuccessfulTask

-TaskSchedulerImpl.handleSuccessfulTask

-TaskSetManager.handleSuccessfulTask

-TaskSetManager.maybeFinishTaskSet

-TaskShedulerImpl.taskSetFinished

-如果执行失败:TaskResultGetter.enqueueFailedTask

-TaskSchedulerImpl.handleFailedTask

-TaskSetManager.handleFailedTask

-TaskSetManager.maybeFinishTaskSet

-TaskShedulerImpl.taskSetFinished

具体就不做分析了,总之任务执行成功是一个资源回收和内部清理的过程;任务执行失败则还要牵扯到一些异常处理逻辑。

差不多就到这里了,虽说标题有“深入”二字,但一些代码细节还是无法清晰地表达。东拉西扯的没什么逻辑性,可见常年不写文档的人脑子有多混沌,这也更加说明了写blog的重要性。本文也算给自己一个mark,项目进行过程中如果遇到一些问题可以再深入看一下这块代码。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: