您的位置:首页 > 其它

Spark2.2 Driver和Executor状态改变处理机制源码分析

2017-10-23 22:04 507 查看

Driver状态发生改变处理机制

DriverStateChanged( )

/**
* leen
* Driver的状态发生改变
*/
case DriverStateChanged(driverId, state, exception) =>
//如果DriverState的状态是:ERROR、FINISHED、KILLED、FAILED则调用removeDriver的方法,移除Driver
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}


removeDriver( )

private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
// leen
// 1.用scala 的高阶函数find() 找到driverid对应的Driver
drivers.find(d => d.id == driverId) match {
// 2.如果找到了  some() 样例类(option) 代表有值
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 2.1将driver从drivers[HashSet]内存中移除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 2.2向completedDrivers中加入Driver
completedDrivers += driver
// 2.3使用持久化引擎,去除driver的持久化信息
persistenceEngine.removeDriver(driver)
driver.state = finalState
driver.exception = exception
// 2.4从driver所在的Worker中移除driver
driver.worker.foreach(w => w.removeDriver(driver))
// 2.5调用Scheduler方法
schedule()
// 3.如果没有找到driver,不做任何操作
case None =>
logWarning(s"Asked to remove unknown driv
4000
er: $driverId")
}
}


Executor状态发生改变处理机制

/**
* leen
* Executorde的状态发生改变
*/
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
// 1.找到Executor所对应的App,之后反过来通过App内部的Executors缓存获得 ExecutorDescription
//   其中ExecutorDescription中含有 appId、execId、cores、state[ExecutorState.Value]信息
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
// 2.如果有值
case Some(exec) =>
val appInfo = idToApp(appId)
// 2.1 设置Executor的状态
val oldState = exec.state
exec.state = state
// 2.2 如果Executor的状态为:RUNNING
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
// 2.3向Driver同步发送当下Executor的状态信息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))

// 2.4 如果Executor的状态为完成状态:KILLED, FAILED, LOST, EXITED
if (ExecutorState.isFinished(state)) {
// 从Worker和App中移除这个Executor
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// 如果一个Application已经被完成,则保存其信息,显示在前端页面
// 从App的缓存中移除Executor
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
//从运行Executor的Worker的缓存中移除Executor
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
// 只需要重试一定次数,这样我们就不会进入无限循环
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
//如果退出的状态不正常,并且EXECUTOR重试的次数 >= MAX_EXECUTOR_RETRIES[10次],则 removeApplication
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) {
// < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
// 3.重新调度执行
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: