您的位置:首页 > 其它

Spark入门——3:Spark的任务调度

2015-05-31 00:37 369 查看
         Spark的核心就是其调度管理逻辑。其中,处于整个程序运作的枢纽的是作业调度管理。

            在这里,我们首先看看DAGScheduler和TaskScheduler各自所负责的功能:DAGScheduler负责将任务拆分成不同阶段具有依赖关系的多批任务,也就是说DAGScheduler实际上是面向stage的调度器,它将任务拆分成不同的stage,我们可以把DAGScheduler看作是负责任务的逻辑调度;而TaskScheduler则是负责每个任务的实际物理调度。TaskScheduler和DAGScheduler通信是通过回调函数来完成的,

   

          首先,我们先列出任务调度涉及的相关概念:

          Task:单个分区数据集上的处理流程单元

          TaskSet:这是一组彼此之间有关联,但是互相不存在shuffle依赖的任务所组成的任务集

          Stage:任务集所对应的调度阶段

          Job:由RDD Action产生的多个stage所组成的一次计算操作

          Application:由多个作业组成的Spark应用程序

              作业调度的顶层逻辑

         spark程序实际上是以RDD为基础的计算操作。而这些操作是延迟执行的,这些操作只会生成对应的RDD关系链,只有当需要返回数据或者向外输出数据的操作才会触发实际的计算工作。

        作业的提交动作实际上是RDD的相关操作中被隐式调用的,在SparkContext内部通过调用DAGScheduler的作业提交接口来完成作业提交。上面已经讲过,DAGScheduler主要是计算作业和任务的依赖关系,属于任务的逻辑调度。两个主要的接口分别是submitJob和runJob。前者调用之后返回一个Jobwaiter对象,用来判断作业完成情况或者取消作业;后者是在其内部调用前者。

        下面我们来看一看这两个接口的源码:

         submitJob:

         

def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
{
// 检查发送的任务是在分区上存在的.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
//返回一个JobWaiter
<span style="color:#ffff33;">waiter</span>
}
       

        runJob:

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null)
{
val start = System.nanoTime
//在runJob内部调用submitJob
val waiter = <span style="background-color: rgb(255, 255, 51);">submitJob</span>(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
//阻塞等待
<span style="background-color: rgb(255, 255, 0);">waiter.awaitResult()</span> match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}

         

       我们会发现,在调用submitJob之后,会返回一个JobWaiter对象,这是用来取消作业/判断作业是否完成,也可以用在异步调用;而runJob则是在自己内部调用了submitJob,然后通过阻塞等待直到任务完成/失败。

         DAGScheduler在SparkContext初始化过程中被创建,SparkContext与DAGScheduler是一对一的关系。DAGScheduler通过Akka
Actor 的消息传递机制构建事件循环逻辑。那么,DAGScheduler中用来处理事件的接口是谁呢?——从源码的接口名中我们不难发现DAGSchedulerEventProcessLoop这个接口,DAGSchedulerEventProcessLoop的Recieve函数给处理事件提供了入口。然而它是一个私有函数,所以DAGScheduler通过公共接口向DAGSchedulerEventProcessLoop发送消息。

 

Spark整体的任务调度过程图如下所示:



       上面我们说了,DAGScheduler是面向stage的,那么他是如何划分stage的呢?它是以ShuffleDependency为依据来划分的。DAGScheduler找到FinalStage后生产作业实例。作业提交的前提是:所有依赖的父调度阶段的结果全部完成可以使用。

       当DAGScheduler事件循环完成后,又会重新扫描等待列表以及失败列表,触发已经就绪的stage。由上图可知,DAGScheduler提交给TaskScheduler的是一个任务集,另外,TaskScheduler自己会创建一个TaskSetManager来管理这个任务集,TaskSetManager的作用就是将具体任务调到相应的Executor节点上运行。

        

    
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: