spark源码分析之master状态改变篇
2018-02-10 22:06
447 查看
解释:
executor状态改变(ExecutorStateChanged)
1、找到executor对应的applicaiton,然后再反过来通过applicaiton内部的executors缓存获得executor信息
2、设置executor的当前状态为LAUNCHING状态,并向driver同步发送ExecutorUpdated消息
3、如果executor的状态已经发生变化,从application中移除executor,从运行中executor对应的worker中移除executor
4、判断如果executor的状态是异常的,进行applicatino重试操作,如果重试了10次,都是失败,那么就认为调度失败,移除application。如果在重试的10次之内恢复正常了,就进行重新调度
源码解释:
第一步:Executor状态发生改变
源码位置:org.apache.spark.deploy.master.Master
/** * Executor状态发生改变 */ case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { // 找到executor对应的App,然后再反过来通过App内部的executors缓存获得executor信息 val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { // 如果有值 case Some(exec) => { // 设置executor的当前状态为LAUNCHING状态 val appInfo = idToApp(appId) val oldState = exec.state exec.state = state if (state == ExecutorState.RUNNING) { assert(oldState == ExecutorState.LAUNCHING, s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } // 向driver同步发送ExecutorUpdated消息 exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus)) // 如果executor的状态已经发生变化 if (Exec 4000 utorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") // If an application has already finished, preserve its // state to display its information properly on the UI if (!appInfo.isFinished) { // 从app缓存中移除executor appInfo.removeExecutor(exec) } // 从运行executor的worker的缓存中移除executor exec.worker.removeExecutor(exec) // 判断,如果executor的状态是非正常的 val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. if (!normalExit) { // 判断applcation当前的重试次数,是否达到了最大值,最大值为10次 if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { // 重新进行调度 schedule() } else { // 否则就进行removeApplication操作 // 也就是说,executor反复调度都是失败,那么就认为application失败了 val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } } } case None => logWarning(s"Got status update for unknown executor $appId/$execId") } }
第二步:点击第一步的removeExecutor
源码位置:org.apache.spark.deploy.master.ApplicationInfo
/** * 将executor从内部的内存缓冲结构中移除 */ private[master] def removeExecutor(exec: ExecutorDesc) { if (executors.contains(exec.id)) { removedExecutors += executors(exec.id) executors -= exec.id coresGranted -= exec.cores } }
第三步:点击第一步的removeApplication
源码位置:org.apache.spark.deploy.master
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) { // 将数据从内存缓存结果中移除 if (apps.contains(app)) { logInfo("Removing app " + app.id) apps -= app idToApp -= app.id endpointToApp -= app.driver addressToApp -= app.driver.address if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a => { Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) } applicationMetricsSystem.removeSource(a.appSource) }) completedApps.trimStart(toRemove) } completedApps += app // Remember it in our history waitingApps -= app // If application events are logged, use them to rebuild the UI asyncRebuildSparkUI(app) for (exec <- app.executors.values) { // 杀掉app对应的executor killExecutor(exec) } app.markFinished(state) if (state != ApplicationState.FINISHED) { app.driver.send(ApplicationRemoved(state.toString)) } // 从持久化引擎中移除application persistenceEngine.removeApplication(app) schedule() // Tell all workers that the application has finished, so they can clean up any app state. workers.foreach { w => w.endpoint.send(ApplicationFinished(app.id)) } } }
driver状态改变(DriverStateChanged)
1、如果driver的状态是错误的、完成的、被杀掉、失败,那么就移除driver
2、首先将driver加入到已经完成的driver中,然后将driver的状态设置为final,最后从driver所对应的worker中移除driver,释放资源
源码解释:
第一步:Driver状态改变
源码位置:org.apache.spark.deploy.master.Master
/** * Driver状态改变 */ case DriverStateChanged(driverId, state, exception) => { state match { // 如果driver的状态是错误、完成、被杀掉、失败 // 那么就移除driver case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } }
第二步:点击第一步中removeDriver
源码位置:org.apache.spark.deploy.master.Master
/** * 移除driver */ private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) { drivers.find(d => d.id == driverId) match { case Some(driver) => logInfo(s"Removing driver: $driverId") drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } // 将driver加入到已经完成的driver中 completedDrivers += driver // 将driver从持久化引擎中移除掉 persistenceEngine.removeDriver(driver) // 将driver的状态设置为final driver.state = finalState driver.exception = exception // 将driver所在的worker中移除掉driver driver.worker.foreach(w => w.removeDriver(driver)) schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } } }
相关文章推荐
- Spark源码分析之Master状态改变处理机制原理
- spark源码学习(二)---Master源码分析(2)-master内组件状态改变机制
- Master原理剖析与源码分析:Master状态改变处理机制原理剖析与源码分析
- Spark的Master分析3(Master状态改变机制分析)
- Spark2.2 Driver和Executor状态改变处理机制源码分析
- 【原】Spark中Master源码分析(二)
- SPARK的MAster资源调度原理(源码)分析
- spark源码分析之Master源码主备切换机制分析
- spark源码分析之master资源调度schedule篇
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- spark core源码分析2 master启动流程
- Spark源码分析之Master主备切换机制
- spark core源码分析2 master启动流程
- Spark源码分析之Master资源调度算法原理
- 0003.spark2.0源码分析(3)--master注册机制与状态管理
- Spark的Master和Worker集群启动的源码分析
- spark源码分析--Master和worker建立连接
- spark源码分析之master注册机制篇
- Spark系列(六)Master注册机制和状态改变机制
- Spark源码分析之Master注册机制原理