Spark2.2 Driver和Executor状态改变处理机制源码分析
2017-10-23 22:04
507 查看
Driver状态发生改变处理机制
DriverStateChanged( )
/** * leen * Driver的状态发生改变 */ case DriverStateChanged(driverId, state, exception) => //如果DriverState的状态是:ERROR、FINISHED、KILLED、FAILED则调用removeDriver的方法,移除Driver state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") }
removeDriver( )
private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) { // leen // 1.用scala 的高阶函数find() 找到driverid对应的Driver drivers.find(d => d.id == driverId) match { // 2.如果找到了 some() 样例类(option) 代表有值 case Some(driver) => logInfo(s"Removing driver: $driverId") // 2.1将driver从drivers[HashSet]内存中移除 drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } // 2.2向completedDrivers中加入Driver completedDrivers += driver // 2.3使用持久化引擎,去除driver的持久化信息 persistenceEngine.removeDriver(driver) driver.state = finalState driver.exception = exception // 2.4从driver所在的Worker中移除driver driver.worker.foreach(w => w.removeDriver(driver)) // 2.5调用Scheduler方法 schedule() // 3.如果没有找到driver,不做任何操作 case None => logWarning(s"Asked to remove unknown driv 4000 er: $driverId") } }
Executor状态发生改变处理机制
/** * leen * Executorde的状态发生改变 */ case ExecutorStateChanged(appId, execId, state, message, exitStatus) => // 1.找到Executor所对应的App,之后反过来通过App内部的Executors缓存获得 ExecutorDescription // 其中ExecutorDescription中含有 appId、execId、cores、state[ExecutorState.Value]信息 val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { // 2.如果有值 case Some(exec) => val appInfo = idToApp(appId) // 2.1 设置Executor的状态 val oldState = exec.state exec.state = state // 2.2 如果Executor的状态为:RUNNING if (state == ExecutorState.RUNNING) { assert(oldState == ExecutorState.LAUNCHING, s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } // 2.3向Driver同步发送当下Executor的状态信息 exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) // 2.4 如果Executor的状态为完成状态:KILLED, FAILED, LOST, EXITED if (ExecutorState.isFinished(state)) { // 从Worker和App中移除这个Executor logInfo(s"Removing executor ${exec.fullId} because it is $state") // 如果一个Application已经被完成,则保存其信息,显示在前端页面 // 从App的缓存中移除Executor if (!appInfo.isFinished) { appInfo.removeExecutor(exec) } //从运行Executor的Worker的缓存中移除Executor exec.worker.removeExecutor(exec) val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. // 只需要重试一定次数,这样我们就不会进入无限循环 // Important note: this code path is not exercised by tests, so be very careful when // changing this `if` condition. //如果退出的状态不正常,并且EXECUTOR重试的次数 >= MAX_EXECUTOR_RETRIES[10次],则 removeApplication if (!normalExit && appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES && MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } // 3.重新调度执行 schedule() case None => logWarning(s"Got status update for unknown executor $appId/$execId") }
相关文章推荐
- Spark源码分析之Master状态改变处理机制原理
- spark源码学习(二)---Master源码分析(2)-master内组件状态改变机制
- Master原理剖析与源码分析:Master状态改变处理机制原理剖析与源码分析
- spark源码学习(三)---worker源码分析-worker启动driver、executor分析
- Spark2.2 Worker、Driver和Executor向Master注册原理剖析图解及源码
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- 3.Master注册机制源码分析和状态改变机制源码分析
- Spark2.2 Executor原理剖析及源码分析
- Spark的Master分析3(Master状态改变机制分析)
- spark源码分析之master状态改变篇
- spark源码学习(二)---Master源码分析(3)-master对driver、executor的调度
- Spark源码分析之worker节点启动driver和executor
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图文+源码分析)—Looper/Handler/Message
- Spark2.2源码之Master主备切换机制
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- Spark修炼之道(高级篇)——Spark源码阅读:第十二节 Spark SQL 处理流程分析
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- android的消息处理机制(图+源码分析)——Looper,Handler,Message