Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
2017-01-04 20:56
537 查看
接着上一节,先看下类DAGScheduler的源码(只保留关心的)
回到本节开始,再看看DAGScheduler.runJob方法
class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, clock: Clock = new SystemClock()) extends Logging { //该类专门负责处理DAG的各种事件 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) //Start the event thread at the end of the constructor (row1350) eventProcessLoop.start() }在主构造器中eventProcessLoop.start的方法被调用,再看下类DAGSchedulerEventProcessLoop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging { //The main event loop of the DAG scheduler. override def onReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) //各种其他的事件... } }DAGSchedulerEventProcessLoop类中没有start方法,start方法在它的父类EventLoop中
private[spark] abstract class EventLoop[E](name: String) extends Logging { /***只保留关心的代码****/ //阻塞队列 private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() //是否停止线程调度 private val stopped = new AtomicBoolean(false) //不停执行的线程 private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { //队列里面取出一个DAG事件进行调度 val event = eventQueue.take() onReceive(event) } } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() //启动线程不停调度DAG事件 eventThread.start() } //Put the event into the event queue. The event thread will process it later. def post(event: E): Unit = { eventQueue.put(event) } }EventLoop主要创建了一个线程来不停处理阻塞队列中的DAG事件,其子类DAGSchedulerEventProcessLoop实现了onReceive(上一个代码块)。也就是说,DAGScheduler被创建了之后,它接着就创建了一个先进先出的调度队列来处理DAG的事件,此时我们关注的实际是Job提交事件。
回到本节开始,再看看DAGScheduler.runJob方法
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) waiter.awaitResult() match { case JobSucceeded => { logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) } case JobFailed(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) throw exception } }这里调用了submitJob的方法并返回了一个回调器JobWaiter,JobWaiter可以阻塞当前线程直到job完成或被取消
/** * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object * can be used to block until the the job finishes executing or can be used to cancel the job. */ def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) waiter }在上面的代码中,eventProcessLoop.post(JobSubmitted..)向EventLoop中的调度队列添加了一个JobSubmitted的事件,该事件会被线程eventThread取出并通过onReceive执行。此时回到DAGSchedulerEventProcessLoop.onReceive方法中
override def onReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) //.... }dagScheduler.handleJobSubmitted方法被执行,该方法主要用来切分Stage
相关文章推荐
- Spark1.3从创建到提交:5)Executor启动源码分析
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- 2. spark源码学习分享:DAGScheduler.runJob
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
- Spark1.3从创建到提交:10)任务提交源码分析
- Spark1.3从创建到提交:4)资源分配源码分析
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- Hadoop-1.2.1学习之Job创建和提交源码分析
- spark源码分析之DAGScheduler提交作业(job)过程、stage阶段说明
- Apache Spark源码分析-- Job的提交与运行
- Spark源码分析之一:Job提交运行总流程概述
- spark源码学习(三):job的提交以及runJob函数的分析
- Spark源码分析之job提交后转换为Stage
- Spark源码分析之Job提交运行总流程概述
- Spark DAGScheduler 功能及源码解析
- spark core源码分析5 spark提交框架
- Hama框架学习(一) 从源码角度分析job的提交和运行过程
- Spark源码解读-JOB的提交与执行