您的位置:首页 > 其它

Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析

2017-01-04 20:56 537 查看
接着上一节,先看下类DAGScheduler的源码(只保留关心的)

class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {

//该类专门负责处理DAG的各种事件
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
//Start the event thread at the end of the constructor (row1350)
eventProcessLoop.start()
}
在主构造器中eventProcessLoop.start的方法被调用,再看下类DAGSchedulerEventProcessLoop

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
//The main event loop of the DAG scheduler.
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)

case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)

//各种其他的事件...
}
}
DAGSchedulerEventProcessLoop类中没有start方法,start方法在它的父类EventLoop中

private[spark] abstract class EventLoop[E](name: String) extends Logging {
/***只保留关心的代码****/
//阻塞队列
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
//是否停止线程调度
private val stopped = new AtomicBoolean(false)
//不停执行的线程
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
//队列里面取出一个DAG事件进行调度
val event = eventQueue.take()
onReceive(event)
}
}
}
}

def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
//启动线程不停调度DAG事件
eventThread.start()
}

//Put the event into the event queue. The event thread will process it later.
def post(event: E): Unit = {
eventQueue.put(event)
}
}
EventLoop主要创建了一个线程来不停处理阻塞队列中的DAG事件,其子类DAGSchedulerEventProcessLoop实现了onReceive(上一个代码块)。也就是说,DAGScheduler被创建了之后,它接着就创建了一个先进先出的调度队列来处理DAG的事件,此时我们关注的实际是Job提交事件。

回到本节开始,再看看DAGScheduler.runJob方法

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}
这里调用了submitJob的方法并返回了一个回调器JobWaiter,JobWaiter可以阻塞当前线程直到job完成或被取消

/**
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
waiter
}
在上面的代码中,eventProcessLoop.post(JobSubmitted..)向EventLoop中的调度队列添加了一个JobSubmitted的事件,该事件会被线程eventThread取出并通过onReceive执行。此时回到DAGSchedulerEventProcessLoop.onReceive方法中

override def onReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
//....
}
 dagScheduler.handleJobSubmitted方法被执行,该方法主要用来切分Stage
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  SPARK