您的位置:首页 > 大数据

大数据IMF传奇行动绝密课程第30课:Master的注册机制和状态管理解密

2016-08-12 00:23 489 查看

Master的注册机制和状态管理解密

一、Master接受Driver注册内幕

二、Master接受Application注册内幕

三、Master接受Worker注册内幕

四、Master处理Driver状态变化内幕

五、Master处理Executor状态变化内幕

一、Master对其它组建注册的处理

1、Master接受注册的对象主要是:Driver、Application、Worker;需要补充说明的是Executor不会注册个Master,Executor是注册给Driver中的SchedulerBackend的;

2、Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到已经正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker以提升处理能力;

//Master.scala中部分代码
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}


3、Master在接收到Worker注册的请求后,首先会判断一下当前的Master是否是Standby的模式,如果是的话就不处理;然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册信息,如果有的话此时不会重复注册;

4、Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息;

private[spark] class WorkerInfo(

val id: String,

val host: String,

val port: Int,

val cores: Int,

val memory: Int,

val endpoint: RpcEndpointRef,

val webUiPort: Int,

val publicAddress: String)

然后调用registerWorker来执行具体的注册的过程,如果Worker的状态是DEAD,则直接过滤掉,对于UNKNOWN状态的内容调用removeWorker进行清理(包括清理该Worker下的Executors和Drivers)

5、注册时是先注册Driver,然后再注册Application

二、Master对Driver和Executor状态变化的处理

1、对Driver状态变化的处理

//Master.scala中部分代码
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)

private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
completedDrivers += driver
persistenceEngine.removeDriver(driver)
driver.state = finalState
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}


2、Executor挂掉的时候系统会尝试一定次数的重启(最多重试10次重启)

//Master.scala中部分代码
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state

if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}

exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))

if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐