您的位置:首页 > 其它

spark源码学习(二)---Master源码分析(2)-master内组件状态改变机制

2016-02-09 13:24 447 查看
本篇学习master里面几个组件状态改变时发生的事情,自己写自己看,不对之处请拍砖。

首先,来看driver的状态改变:

case DriverStateChanged(driverId, state, exception) => {
state match {
//如果driver的状态是ERROR,Finished,killed,或者Failed,那么移除该driver信息
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}


当driver的状态编程上面的状态的时候,会执行remover driver,remove的操作基本上就是移除内存中的driver,worker,并修改状态信息的操作,remove的代码如下:

private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
//首先,查看是否存在该driver,如果不存在日志警告;如果存在该driver,将drivers中的该driver移除
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//从set中移除该driver
drivers -= driver

if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
//将移除的driver加入到完成的driver记录容器中
completedDrivers += driver
//移除driver的持久化信息
persistenceEngine.removeDriver(driver)
//更改driver的状态
driver.state = finalState
driver.exception = exception
//移除该driver对应的worker
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}


可以发现master类中RequestKillDriver方法:master收到要求kill掉该driver的时候会触发driver状态改变   

case RequestKillDriver(driverId) => {
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Can only kill drivers in ALIVE state."
context.reply(KillDriverResponse(self, driverId, success = false, msg))
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
driver match {
case Some(d) =>
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
} else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
d.worker.foreach { w =>
w.endpoint.send(KillDriver(driverId))
}
}
// TODO: It would be nice for this to be a synchronous response
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
context.reply(KillDriverResponse(self, driverId, success = true, msg))
case None =>
val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
context.reply(KillDriverResponse(self, driverId, success = false, msg))
}
}
}


下面分析executor的状态改变:

case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
//找到exexutor对应的application,通过获取executor的信息
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

execOption match {
//如果找到了该executor
case Some(exec) => {
val appInfo = idToApp(appId)
//修改状态为当前的新状态
val oldState = exec.state
exec.state = state
//如果该executor正在运行,那么,重新设置APP的重试次数
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
//向driver发送executor的更新信息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
//如果executor状态是finished,则从worker和APP中移除该driver
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
// 如果AP没有完成,需要保持状态以便将信息正确输出到ui,那么不从APP中移除executor,只移除worker中的executor
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
//如果executor退出状态是非正常的
if (!normalExit) {
//如果没有达到app重试次数最大值,那么会重新执行调度Schedule方法
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
//否则 进行removeApplication操作
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: