大数据IMF传奇行动绝密课程第30课:Master的注册机制和状态管理解密
2016-08-12 00:23
489 查看
Master的注册机制和状态管理解密
一、Master接受Driver注册内幕二、Master接受Application注册内幕
三、Master接受Worker注册内幕
四、Master处理Driver状态变化内幕
五、Master处理Executor状态变化内幕
一、Master对其它组建注册的处理
1、Master接受注册的对象主要是:Driver、Application、Worker;需要补充说明的是Executor不会注册个Master,Executor是注册给Driver中的SchedulerBackend的;
2、Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到已经正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker以提升处理能力;
//Master.scala中部分代码 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { context.reply(MasterInStandby) } else if (idToWorker.contains(id)) { context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) context.reply(RegisteredWorker(self, masterWebUiUrl)) schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } } }
3、Master在接收到Worker注册的请求后,首先会判断一下当前的Master是否是Standby的模式,如果是的话就不处理;然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册信息,如果有的话此时不会重复注册;
4、Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息;
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
val webUiPort: Int,
val publicAddress: String)
然后调用registerWorker来执行具体的注册的过程,如果Worker的状态是DEAD,则直接过滤掉,对于UNKNOWN状态的内容调用removeWorker进行清理(包括清理该Worker下的Executors和Drivers)
5、注册时是先注册Driver,然后再注册Application
二、Master对Driver和Executor状态变化的处理
1、对Driver状态变化的处理
//Master.scala中部分代码 case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) { drivers.find(d => d.id == driverId) match { case Some(driver) => logInfo(s"Removing driver: $driverId") drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) { val toRemove = math.max(RETAINED_DRIVERS / 10, 1) completedDrivers.trimStart(toRemove) } completedDrivers += driver persistenceEngine.removeDriver(driver) driver.state = finalState driver.exception = exception driver.worker.foreach(w => w.removeDriver(driver)) schedule() case None => logWarning(s"Asked to remove unknown driver: $driverId") } }
2、Executor挂掉的时候系统会尝试一定次数的重启(最多重试10次重启)
//Master.scala中部分代码 case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { val appInfo = idToApp(appId) val oldState = exec.state exec.state = state if (state == ExecutorState.RUNNING) { assert(oldState == ExecutorState.LAUNCHING, s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus)) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") // If an application has already finished, preserve its // state to display its information properly on the UI if (!appInfo.isFinished) { appInfo.removeExecutor(exec) } exec.worker.removeExecutor(exec) val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. if (!normalExit) { if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { schedule() } else { val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } } } case None => logWarning(s"Got status update for unknown executor $appId/$execId") } }
相关文章推荐
- 大数据IMF传奇行动绝密课程第92课:SparkStreaming中Transformations和状态管理解密
- 大数据IMF传奇行动绝密课程第32课:Spark Worker原理和源码剖析解密
- 大数据IMF传奇行动绝密课程第25课:Spark Sort-Based Shuffle内幕彻底解密
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第12课:HA下的Spark集群工作原理解密
- 大数据IMF传奇行动绝密课程第41课:Checkpoint彻底解密
- 大数据IMF传奇行动绝密课程第54课:Spark性能优化第十季之Spark统一内存管理
- 大数据IMF传奇行动绝密课程第33课:Spark Executor内幕彻底解密
- 大数据IMF传奇行动绝密课程第63课:Spark SQL下Parquet内幕深度解密
- 大数据IMF传奇行动绝密课程第11课:彻底解密WordCount运行原理
- 大数据IMF传奇行动绝密课程第15课:RDD创建内幕彻底解密
- 大数据IMF传奇行动绝密课程第14课:Spark RDD解密
- 大数据IMF传奇行动绝密课程第40课:CacheManager彻底解密
- [Spark内核] 第30课:Master的注册机制和状态管理解密
- 大数据IMF传奇行动绝密课程第34课:Stage划分和Task最佳位置算法解密
- 大数据IMF传奇行动绝密课程第42课:Checkpoint内幕解密
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第27课:Spark on Yarn彻底解密
- 大数据IMF传奇行动绝密课程第25课:Spark Sort-Based Shuffle内幕彻底解密
- 大数据IMF传奇行动绝密课程第11课:彻底解密WordCount运行原理