第30课:Master的注册机制和状态管理解密 课堂笔记
2017-06-04 22:13
393 查看
第30课:Master的注册机制和状态管理解密 课堂笔记
一:Master对其它组件注册的处理
Master接受注册的对象主要就是:Driver、Application、Worker; Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的;
Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到已经正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker以提升处理能力。假如在生产环境中集群中有500台机器,可能又新加入100台机器,这个时候不需要重新启动整个集群,就可以将100台新机器加入到集群。
Worker的源码:1. private[deploy] class Worker(2. override val rpcEnv: RpcEnv,3. webUiPort: Int,4. cores: Int,5. memory: Int,6. masterRpcAddresses: Array[RpcAddress],7. endpointName: String,8. workDirPath: String = null,9. val conf: SparkConf,10. val securityMgr:SecurityManager)11. extendsThreadSafeRpcEndpoint with Logging {
Worker是一个消息循环体,继承至ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下:1. override def onStart() {2. ……3. workerWebUiUrl =s"http://$publicAddress:${webUi.boundPort}"4. registerWithMaster()5. ……6. }
Worker的onStart方法中调用了registerWithMaster():1. private def registerWithMaster() {2. .......3. registrationRetryTimer match {4. case None =>5. registered = false6. registerMasterFutures =tryRegisterAllMasters()7. ......
registerWithMaster()方法中调用了tryRegisterAllMasters,向所有的Master进行注册1. private def tryRegisterAllMasters():Array[JFuture[_]] = {2. masterRpcAddresses.map { masterAddress=>3. registerMasterThreadPool.submit(newRunnable {4. override def run(): Unit = {5. try {6. logInfo("Connecting to master" + masterAddress + "...")7. val masterEndpoint =rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)8. registerWithMaster(masterEndpoint)9. } catch {10. case ie: InterruptedException => //Cancelled11. case NonFatal(e)=> logWarning(s"Failed to connect to master $masterAddress", e)12. }13. }14. })15. }16. }
tryRegisterAllMasters方法中,由于实际运行的时候有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint,masterEndpoint是一个RpcEndpointRef,通过registerWithMaster(masterEndpoint)进行注册。1. private defregisterWithMaster(masterEndpoint: RpcEndpointRef): Unit = {2. masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(3. workerId, host, port, self, cores,memory, workerWebUiUrl))4. .onComplete {5. // This is a very fast action so we canuse "ThreadUtils.sameThread"6. case Success(msg) =>7. Utils.tryLogNonFatalError {8. handleRegisterResponse(msg)9. }10. case Failure(e) =>11. logError(s"Cannot register withmaster: ${masterEndpoint.address}", e)12. System.exit(1)13. }(ThreadUtils.sameThread)14. }
registerWithMaster方法中masterEndpoint.ask[RegisterWorkerResponse] (RegisterWorker(传进去的是RegisterWorker,RegisterWorker是一个case class, 包括 id、host、port、worker、cores、memory 等信息, 这里worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。
RegisterWorker源码如下:1. case class RegisterWorker(2. id: String,3. host: String,4. port: Int,5. worker: RpcEndpointRef,6. cores: Int,7. memory: Int,8. workerWebUiUrl: String)9. extends DeployMessage {10. Utils.checkHost(host,"Required hostname")11. assert (port > 0)12. }
Worker通过registerWithMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求以后,进行相应的处理:
Master.scala 的receiveAndReply源码如下:1. override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {2. case RegisterWorker(3. id, workerHost, workerPort, workerRef,cores, memory, workerWebUiUrl) =>4. logInfo("Registering worker %s:%dwith %d cores, %s RAM".format(5. workerHost, workerPort, cores,Utils.megabytesToString(memory)))6. if (state == RecoveryState.STANDBY) {7. context.reply(MasterInStandby)8. } else if (idToWorker.contains(id)) {9. context.reply(RegisterWorkerFailed("Duplicateworker ID"))10. } else {11. val worker = newWorkerInfo(id, workerHost, workerPort, cores, memory,12. workerRef,workerWebUiUrl)13. if(registerWorker(worker)) {14. persistenceEngine.addWorker(worker)15. context.reply(RegisteredWorker(self,masterWebUiUrl))16. schedule()17. } else {18. val workerAddress =worker.endpoint.address19. logWarning("Worker registrationfailed. Attempted to re-register worker at same " +20. "address:" + workerAddress)21. context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "22. + workerAddress))23. }24. }
RegisterWorker中:Master在接收到Worker注册的请求后,首先会判断一下当前的Master是否是Standby的模式,如果是的话就不处理;Master的idToWorker包含了所有已经注册的worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有的话此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表worker的字符描述,Value是WorkerInfo。1. private val idToWorker = new HashMap[String,WorkerInfo]
WorkerInfo 包括 id、host、port 、cores、memory、endpoint 等内容:1. private[spark] class WorkerInfo(2. val id: String,3. val host: String,4. val port: Int,5. val cores: Int,6. val memory: Int,7. val endpoint: RpcEndpointRef,8. val webUiAddress: String)9. extends Serializable {
Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker来执行具体的注册的过程,如果Worker的状态是否是DEAD的状态则直接过滤掉,对于UNKNOWN 的内容调用removeWorker进行清理(包括清理该
Worker下的Executors和Drivers)。其中UNKNOWN的情况:Master进行切换的时候,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常的状态。
registerWorker的源码如下:1. private def registerWorker(worker:WorkerInfo): Boolean = {2. // There may be one or more refs to deadworkers on this same node (w/ different ID's),3. // remove them.4. workers.filter { w =>5. (w.host == worker.host && w.port== worker.port) && (w.state == WorkerState.DEAD)6. }.foreach { w =>7. workers -= w8. }9. 10. val workerAddress =worker.endpoint.address11. if(addressToWorker.contains(workerAddress)) {12. val oldWorker =addressToWorker(workerAddress)13. if (oldWorker.state ==WorkerState.UNKNOWN) {14. // A workerregistering from UNKNOWN implies that the worker was restarted during recovery.15. // The old worker mustthus be dead, so we will remove it and accept the new worker.16. removeWorker(oldWorker)17. } else {18. logInfo("Attempted to re-registerworker at same address: " + workerAddress)19. return false20. }21. }22. 23. workers += worker24. idToWorker(worker.id) =worker25. addressToWorker(workerAddress) = worker26. if (reverseProxy) {27. webUi.addProxyTargets(worker.id,worker.webUiAddress)28. }29. true30. }
在registerWorker方法中,Worker注册完成以后,把注册的Worker加入到Master的内存的数据结构中。1. val workers = newHashSet[WorkerInfo]2. private val idToWorker = newHashMap[String, WorkerInfo]3. private val addressToWorker = newHashMap[RpcAddress, WorkerInfo] 4. ……5. 6. workers += worker7. idToWorker(worker.id) = worker8. addressToWorker(workerAddress) = worker
回到Master.scala的receiveAndReply方法,Worker注册完成以后,调用 persistenceEngine.addWorker(worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。1. …… 2. if (registerWorker(worker)) {3. persistenceEngine.addWorker(worker)4. context.reply(RegisteredWorker(self,masterWebUiUrl))5. schedule()6. } else {
PersistenceEngine.scala的addWorker方法1. final def addWorker(worker: WorkerInfo):Unit = {2. persist("worker_" + worker.id,worker)3. }
ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下:1. private[master] classZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)2. extends PersistenceEngine3. ......4. override def persist(name: String, obj:Object): Unit = {5. serializeIntoFile(WORKING_DIR +"/" + name, obj)6. }7. ......8. private defserializeIntoFile(path: String, value: AnyRef) {9. val serialized =serializer.newInstance().serialize(value)10. val bytes = newArray[Byte](serialized.remaining())11. serialized.get(bytes)12. zk.create().withMode(CreateMode.PERSISTENT).forPath(path,bytes)13. }
回到Master.scala的receiveAndReply方法, 注册的Worker数据持久化以后,进行schedule():1. …… 2. context.reply(RegisteredWorker(self,masterWebUiUrl))3. schedule()4. ……
至此,Worker的注册已经完成。
同样的, Driver的注册过程:Driver提交Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎例如ZooKeeper把注册信息持久化起来,然后进行Schedule。
Application的注册过程:Application提交Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部StandaloneAppClient,StandaloneAppClient内部有ClientEndpoint,ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎例如ZooKeeper把注册信息持久化起来,然后进行Schedule。
二:Master对Driver和Executor状态变化的处理
对Driver状态变化的处理, 如果Driver的各个状态是:DriverState.ERROR| DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,将清理掉。其它情况报异常。1. override def receive: PartialFunction[Any,Unit] = {2. ......3. case DriverStateChanged(driverId, state,exception) =>4. state match {5. case DriverState.ERROR |DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>6. removeDriver(driverId, state,exception)7. case _ =>8. throw new Exception(s"Receivedunexpected state update for driver $driverId: $state")9. }
removeDriver清理掉Driver以后,再次调用 schedule(),removeDriver的源码:1. privatedef removeDriver(2. driverId: String,3. finalState: DriverState,4. exception: Option[Exception]) {5. drivers.find(d => d.id == driverId)match {6. case Some(driver) =>7. logInfo(s"Removing driver:$driverId")8. drivers -= driver9. if (completedDrivers.size >=RETAINED_DRIVERS) {10. val toRemove =math.max(RETAINED_DRIVERS / 10, 1)11. completedDrivers.trimStart(toRemove)12. }13. completedDrivers +=driver14. persistenceEngine.removeDriver(driver)15. driver.state =finalState16. driver.exception =exception17. driver.worker.foreach(w =>w.removeDriver(driver))18. schedule()19. case None =>20. logWarning(s"Asked to removeunknown driver: $driverId")21. }22. }23. }
对Executor状态变化的处理,ExecutorStateChanged的源码如下:1. override def receive: PartialFunction[Any,Unit] = {2. ......1. caseExecutorStateChanged(appId, execId, state, message, exitStatus) =>2. val execOption =idToApp.get(appId).flatMap(app => app.executors.get(execId))3. execOption match {4. case Some(exec) =>5. val appInfo = idToApp(appId)6. val oldState = exec.state7. exec.state = state8. 9. if (state == ExecutorState.RUNNING) {10. assert(oldState ==ExecutorState.LAUNCHING,11. s"executor$execId state transfer from $oldState to RUNNING is illegal")12. appInfo.resetRetryCount()13. }14. 15. exec.application.driver.send(ExecutorUpdated(execId,state, message, exitStatus, false))16. 17. if(ExecutorState.isFinished(state)) {18. // Remove thisexecutor from the worker and app19. logInfo(s"Removing executor${exec.fullId} because it is $state")20. // If anapplication has already finished, preserve its21. // state todisplay its information properly on the UI22. if(!appInfo.isFinished) {23. appInfo.removeExecutor(exec)24. }25. exec.worker.removeExecutor(exec)26. 27. val normalExit =exitStatus == Some(0)28. // Only retrycertain number of times so we don't go into an infinite loop.29. // Important note:this code path is not exercised by tests, so be very careful when30. // changing this`if` condition.31. if (!normalExit32. &&appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES33. && MAX_EXECUTOR_RETRIES >= 0) {// < 0 disables this application-killing path34. val execs =appInfo.executors.values35. if(!execs.exists(_.state == ExecutorState.RUNNING)) {36. logError(s"Application${appInfo.desc.name} with ID ${appInfo.id} failed " +37. s"${appInfo.retryCount}times; removing it")38. removeApplication(appInfo,ApplicationState.FAILED)39. }40. }41. }42. schedule()43. case None =>44. logWarning(s"Got status update forunknown executor $appId/$execId")45. }
Executor挂掉的时候系统会尝试一定次数的重启(最多重试10次重启)。1. privateval MAX_EXECUTOR_RETRIES =conf.getInt("spark.deploy.maxExecutorRetries", 10)
一:Master对其它组件注册的处理
Master接受注册的对象主要就是:Driver、Application、Worker; Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的;
Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到已经正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker以提升处理能力。假如在生产环境中集群中有500台机器,可能又新加入100台机器,这个时候不需要重新启动整个集群,就可以将100台新机器加入到集群。
Worker的源码:1. private[deploy] class Worker(2. override val rpcEnv: RpcEnv,3. webUiPort: Int,4. cores: Int,5. memory: Int,6. masterRpcAddresses: Array[RpcAddress],7. endpointName: String,8. workDirPath: String = null,9. val conf: SparkConf,10. val securityMgr:SecurityManager)11. extendsThreadSafeRpcEndpoint with Logging {
Worker是一个消息循环体,继承至ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下:1. override def onStart() {2. ……3. workerWebUiUrl =s"http://$publicAddress:${webUi.boundPort}"4. registerWithMaster()5. ……6. }
Worker的onStart方法中调用了registerWithMaster():1. private def registerWithMaster() {2. .......3. registrationRetryTimer match {4. case None =>5. registered = false6. registerMasterFutures =tryRegisterAllMasters()7. ......
registerWithMaster()方法中调用了tryRegisterAllMasters,向所有的Master进行注册1. private def tryRegisterAllMasters():Array[JFuture[_]] = {2. masterRpcAddresses.map { masterAddress=>3. registerMasterThreadPool.submit(newRunnable {4. override def run(): Unit = {5. try {6. logInfo("Connecting to master" + masterAddress + "...")7. val masterEndpoint =rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)8. registerWithMaster(masterEndpoint)9. } catch {10. case ie: InterruptedException => //Cancelled11. case NonFatal(e)=> logWarning(s"Failed to connect to master $masterAddress", e)12. }13. }14. })15. }16. }
tryRegisterAllMasters方法中,由于实际运行的时候有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint,masterEndpoint是一个RpcEndpointRef,通过registerWithMaster(masterEndpoint)进行注册。1. private defregisterWithMaster(masterEndpoint: RpcEndpointRef): Unit = {2. masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(3. workerId, host, port, self, cores,memory, workerWebUiUrl))4. .onComplete {5. // This is a very fast action so we canuse "ThreadUtils.sameThread"6. case Success(msg) =>7. Utils.tryLogNonFatalError {8. handleRegisterResponse(msg)9. }10. case Failure(e) =>11. logError(s"Cannot register withmaster: ${masterEndpoint.address}", e)12. System.exit(1)13. }(ThreadUtils.sameThread)14. }
registerWithMaster方法中masterEndpoint.ask[RegisterWorkerResponse] (RegisterWorker(传进去的是RegisterWorker,RegisterWorker是一个case class, 包括 id、host、port、worker、cores、memory 等信息, 这里worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。
RegisterWorker源码如下:1. case class RegisterWorker(2. id: String,3. host: String,4. port: Int,5. worker: RpcEndpointRef,6. cores: Int,7. memory: Int,8. workerWebUiUrl: String)9. extends DeployMessage {10. Utils.checkHost(host,"Required hostname")11. assert (port > 0)12. }
Worker通过registerWithMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求以后,进行相应的处理:
Master.scala 的receiveAndReply源码如下:1. override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {2. case RegisterWorker(3. id, workerHost, workerPort, workerRef,cores, memory, workerWebUiUrl) =>4. logInfo("Registering worker %s:%dwith %d cores, %s RAM".format(5. workerHost, workerPort, cores,Utils.megabytesToString(memory)))6. if (state == RecoveryState.STANDBY) {7. context.reply(MasterInStandby)8. } else if (idToWorker.contains(id)) {9. context.reply(RegisterWorkerFailed("Duplicateworker ID"))10. } else {11. val worker = newWorkerInfo(id, workerHost, workerPort, cores, memory,12. workerRef,workerWebUiUrl)13. if(registerWorker(worker)) {14. persistenceEngine.addWorker(worker)15. context.reply(RegisteredWorker(self,masterWebUiUrl))16. schedule()17. } else {18. val workerAddress =worker.endpoint.address19. logWarning("Worker registrationfailed. Attempted to re-register worker at same " +20. "address:" + workerAddress)21. context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "22. + workerAddress))23. }24. }
RegisterWorker中:Master在接收到Worker注册的请求后,首先会判断一下当前的Master是否是Standby的模式,如果是的话就不处理;Master的idToWorker包含了所有已经注册的worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有的话此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表worker的字符描述,Value是WorkerInfo。1. private val idToWorker = new HashMap[String,WorkerInfo]
WorkerInfo 包括 id、host、port 、cores、memory、endpoint 等内容:1. private[spark] class WorkerInfo(2. val id: String,3. val host: String,4. val port: Int,5. val cores: Int,6. val memory: Int,7. val endpoint: RpcEndpointRef,8. val webUiAddress: String)9. extends Serializable {
Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker来执行具体的注册的过程,如果Worker的状态是否是DEAD的状态则直接过滤掉,对于UNKNOWN 的内容调用removeWorker进行清理(包括清理该
Worker下的Executors和Drivers)。其中UNKNOWN的情况:Master进行切换的时候,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常的状态。
registerWorker的源码如下:1. private def registerWorker(worker:WorkerInfo): Boolean = {2. // There may be one or more refs to deadworkers on this same node (w/ different ID's),3. // remove them.4. workers.filter { w =>5. (w.host == worker.host && w.port== worker.port) && (w.state == WorkerState.DEAD)6. }.foreach { w =>7. workers -= w8. }9. 10. val workerAddress =worker.endpoint.address11. if(addressToWorker.contains(workerAddress)) {12. val oldWorker =addressToWorker(workerAddress)13. if (oldWorker.state ==WorkerState.UNKNOWN) {14. // A workerregistering from UNKNOWN implies that the worker was restarted during recovery.15. // The old worker mustthus be dead, so we will remove it and accept the new worker.16. removeWorker(oldWorker)17. } else {18. logInfo("Attempted to re-registerworker at same address: " + workerAddress)19. return false20. }21. }22. 23. workers += worker24. idToWorker(worker.id) =worker25. addressToWorker(workerAddress) = worker26. if (reverseProxy) {27. webUi.addProxyTargets(worker.id,worker.webUiAddress)28. }29. true30. }
在registerWorker方法中,Worker注册完成以后,把注册的Worker加入到Master的内存的数据结构中。1. val workers = newHashSet[WorkerInfo]2. private val idToWorker = newHashMap[String, WorkerInfo]3. private val addressToWorker = newHashMap[RpcAddress, WorkerInfo] 4. ……5. 6. workers += worker7. idToWorker(worker.id) = worker8. addressToWorker(workerAddress) = worker
回到Master.scala的receiveAndReply方法,Worker注册完成以后,调用 persistenceEngine.addWorker(worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。1. …… 2. if (registerWorker(worker)) {3. persistenceEngine.addWorker(worker)4. context.reply(RegisteredWorker(self,masterWebUiUrl))5. schedule()6. } else {
PersistenceEngine.scala的addWorker方法1. final def addWorker(worker: WorkerInfo):Unit = {2. persist("worker_" + worker.id,worker)3. }
ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下:1. private[master] classZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)2. extends PersistenceEngine3. ......4. override def persist(name: String, obj:Object): Unit = {5. serializeIntoFile(WORKING_DIR +"/" + name, obj)6. }7. ......8. private defserializeIntoFile(path: String, value: AnyRef) {9. val serialized =serializer.newInstance().serialize(value)10. val bytes = newArray[Byte](serialized.remaining())11. serialized.get(bytes)12. zk.create().withMode(CreateMode.PERSISTENT).forPath(path,bytes)13. }
回到Master.scala的receiveAndReply方法, 注册的Worker数据持久化以后,进行schedule():1. …… 2. context.reply(RegisteredWorker(self,masterWebUiUrl))3. schedule()4. ……
至此,Worker的注册已经完成。
同样的, Driver的注册过程:Driver提交Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎例如ZooKeeper把注册信息持久化起来,然后进行Schedule。
Application的注册过程:Application提交Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部StandaloneAppClient,StandaloneAppClient内部有ClientEndpoint,ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎例如ZooKeeper把注册信息持久化起来,然后进行Schedule。
二:Master对Driver和Executor状态变化的处理
对Driver状态变化的处理, 如果Driver的各个状态是:DriverState.ERROR| DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,将清理掉。其它情况报异常。1. override def receive: PartialFunction[Any,Unit] = {2. ......3. case DriverStateChanged(driverId, state,exception) =>4. state match {5. case DriverState.ERROR |DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>6. removeDriver(driverId, state,exception)7. case _ =>8. throw new Exception(s"Receivedunexpected state update for driver $driverId: $state")9. }
removeDriver清理掉Driver以后,再次调用 schedule(),removeDriver的源码:1. privatedef removeDriver(2. driverId: String,3. finalState: DriverState,4. exception: Option[Exception]) {5. drivers.find(d => d.id == driverId)match {6. case Some(driver) =>7. logInfo(s"Removing driver:$driverId")8. drivers -= driver9. if (completedDrivers.size >=RETAINED_DRIVERS) {10. val toRemove =math.max(RETAINED_DRIVERS / 10, 1)11. completedDrivers.trimStart(toRemove)12. }13. completedDrivers +=driver14. persistenceEngine.removeDriver(driver)15. driver.state =finalState16. driver.exception =exception17. driver.worker.foreach(w =>w.removeDriver(driver))18. schedule()19. case None =>20. logWarning(s"Asked to removeunknown driver: $driverId")21. }22. }23. }
对Executor状态变化的处理,ExecutorStateChanged的源码如下:1. override def receive: PartialFunction[Any,Unit] = {2. ......1. caseExecutorStateChanged(appId, execId, state, message, exitStatus) =>2. val execOption =idToApp.get(appId).flatMap(app => app.executors.get(execId))3. execOption match {4. case Some(exec) =>5. val appInfo = idToApp(appId)6. val oldState = exec.state7. exec.state = state8. 9. if (state == ExecutorState.RUNNING) {10. assert(oldState ==ExecutorState.LAUNCHING,11. s"executor$execId state transfer from $oldState to RUNNING is illegal")12. appInfo.resetRetryCount()13. }14. 15. exec.application.driver.send(ExecutorUpdated(execId,state, message, exitStatus, false))16. 17. if(ExecutorState.isFinished(state)) {18. // Remove thisexecutor from the worker and app19. logInfo(s"Removing executor${exec.fullId} because it is $state")20. // If anapplication has already finished, preserve its21. // state todisplay its information properly on the UI22. if(!appInfo.isFinished) {23. appInfo.removeExecutor(exec)24. }25. exec.worker.removeExecutor(exec)26. 27. val normalExit =exitStatus == Some(0)28. // Only retrycertain number of times so we don't go into an infinite loop.29. // Important note:this code path is not exercised by tests, so be very careful when30. // changing this`if` condition.31. if (!normalExit32. &&appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES33. && MAX_EXECUTOR_RETRIES >= 0) {// < 0 disables this application-killing path34. val execs =appInfo.executors.values35. if(!execs.exists(_.state == ExecutorState.RUNNING)) {36. logError(s"Application${appInfo.desc.name} with ID ${appInfo.id} failed " +37. s"${appInfo.retryCount}times; removing it")38. removeApplication(appInfo,ApplicationState.FAILED)39. }40. }41. }42. schedule()43. case None =>44. logWarning(s"Got status update forunknown executor $appId/$execId")45. }
Executor挂掉的时候系统会尝试一定次数的重启(最多重试10次重启)。1. privateval MAX_EXECUTOR_RETRIES =conf.getInt("spark.deploy.maxExecutorRetries", 10)
相关文章推荐
- 第30课 Master的注册机制和状态管理解密
- [Spark内核] 第30课:Master的注册机制和状态管理解密
- 大数据IMF传奇行动绝密课程第30课:Master的注册机制和状态管理解密
- Master的注册机制和状态管理解密
- day30:Master的注册机制和状态管理解密
- Master的注册机制和状态管理详解
- 0003.spark2.0源码分析(3)--master注册机制与状态管理
- Master的注册机制和状态管理
- master的注册机制和状态管理
- Spark系列(六)Master注册机制和状态改变机制
- [原创]java WEB学习笔记28: 会话与状态管理Cookie 机制
- Spark系列(六)Master注册机制和状态改变机制
- 用IOT的思维来管理我们的查看我们重要业务的服务器健康状态-将IOT设备注册到设备中心!
- java虚拟机学习笔记_第一部分内存管理机制
- 【jsp笔记】状态管理
- Linux 第六周学习笔记 (2),特殊权限列表,at延时任务及定时机制,系统临时文件的管理
- objective 2.0 内存管理机制 笔记
- Web Service学习笔记:状态管理
- UNIX环境编程学习笔记(21)——进程管理之获取进程终止状态的 wait 和 waitpid 函数
- [ASP.NET学习笔记之十九]ASP.NET状态管理