您的位置:首页 > 其它

spark源码分析之master状态改变篇

2018-02-10 22:06 447 查看


解释:

executor状态改变(ExecutorStateChanged)

1、找到executor对应的applicaiton,然后再反过来通过applicaiton内部的executors缓存获得executor信息

2、设置executor的当前状态为LAUNCHING状态,并向driver同步发送ExecutorUpdated消息

3、如果executor的状态已经发生变化,从application中移除executor,从运行中executor对应的worker中移除executor

4、判断如果executor的状态是异常的,进行applicatino重试操作,如果重试了10次,都是失败,那么就认为调度失败,移除application。如果在重试的10次之内恢复正常了,就进行重新调度

源码解释:

第一步:Executor状态发生改变

源码位置:org.apache.spark.deploy.master.Master

/**
* Executor状态发生改变
*/
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {

// 找到executor对应的App,然后再反过来通过App内部的executors缓存获得executor信息
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))

execOption match {
// 如果有值
case Some(exec) => {
// 设置executor的当前状态为LAUNCHING状态
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state

if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}

// 向driver同步发送ExecutorUpdated消息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))

// 如果executor的状态已经发生变化
if (Exec
4000
utorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
// 从app缓存中移除executor
appInfo.removeExecutor(exec)
}
// 从运行executor的worker的缓存中移除executor
exec.worker.removeExecutor(exec)

// 判断,如果executor的状态是非正常的
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
// 判断applcation当前的重试次数,是否达到了最大值,最大值为10次
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
// 重新进行调度
schedule()
} else {
// 否则就进行removeApplication操作
// 也就是说,executor反复调度都是失败,那么就认为application失败了
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}


第二步:点击第一步的removeExecutor

源码位置:org.apache.spark.deploy.master.ApplicationInfo

/**
* 将executor从内部的内存缓冲结构中移除
*/
private[master] def removeExecutor(exec: ExecutorDesc) {
if (executors.contains(exec.id)) {
removedExecutors += executors(exec.id)
executors -= exec.id
coresGranted -= exec.cores
}
}


第三步:点击第一步的removeApplication

源码位置:org.apache.spark.deploy.master

def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
// 将数据从内存缓存结果中移除
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
apps -= app
idToApp -= app.id
endpointToApp -= app.driver
addressToApp -= app.driver.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app

// If application events are logged, use them to rebuild the UI
asyncRebuildSparkUI(app)

for (exec <- app.executors.values) {
// 杀掉app对应的executor
killExecutor(exec)
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
app.driver.send(ApplicationRemoved(state.toString))
}

// 从持久化引擎中移除application
persistenceEngine.removeApplication(app)
schedule()

// Tell all workers that the application has finished, so they can clean up any app state.
workers.foreach { w =>
w.endpoint.send(ApplicationFinished(app.id))
}
}
}


driver状态改变(DriverStateChanged)

1、如果driver的状态是错误的、完成的、被杀掉、失败,那么就移除driver

2、首先将driver加入到已经完成的driver中,然后将driver的状态设置为final,最后从driver所对应的worker中移除driver,释放资源

源码解释:

第一步:Driver状态改变

源码位置:org.apache.spark.deploy.master.Master

/**
* Driver状态改变
*/
case DriverStateChanged(driverId, state, exception) => {
state match {
// 如果driver的状态是错误、完成、被杀掉、失败
// 那么就移除driver
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}


第二步:点击第一步中removeDriver

源码位置:org.apache.spark.deploy.master.Master

/**
* 移除driver
*/
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 将driver加入到已经完成的driver中
completedDrivers += driver
// 将driver从持久化引擎中移除掉
persistenceEngine.removeDriver(driver)
// 将driver的状态设置为final
driver.state = finalState
driver.exception = exception
// 将driver所在的worker中移除掉driver
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark