您的位置:首页 > 其它

第30课:Master的注册机制和状态管理解密 课堂笔记

2017-06-04 22:13 393 查看
第30课:Master的注册机制和状态管理解密 课堂笔记

一:Master对其它组件注册的处理
         Master接受注册的对象主要就是:Driver、Application、Worker; Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的;
Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到已经正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker以提升处理能力。假如在生产环境中集群中有500台机器,可能又新加入100台机器,这个时候不需要重新启动整个集群,就可以将100台新机器加入到集群。
         Worker的源码:1.        private[deploy] class Worker(2.           override val rpcEnv: RpcEnv,3.           webUiPort: Int,4.           cores: Int,5.           memory: Int,6.           masterRpcAddresses: Array[RpcAddress],7.           endpointName: String,8.           workDirPath: String = null,9.           val conf: SparkConf,10.         val securityMgr:SecurityManager)11.       extendsThreadSafeRpcEndpoint with Logging {
 
Worker是一个消息循环体,继承至ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下:1.           override def onStart() {2.         ……3.           workerWebUiUrl =s"http://$publicAddress:${webUi.boundPort}"4.           registerWithMaster()5.           ……6.         }
 
Worker的onStart方法中调用了registerWithMaster():1.           private def registerWithMaster() {2.           .......3.           registrationRetryTimer match {4.             case None =>5.               registered = false6.               registerMasterFutures =tryRegisterAllMasters()7.            ......
 
registerWithMaster()方法中调用了tryRegisterAllMasters,向所有的Master进行注册1.                private def tryRegisterAllMasters():Array[JFuture[_]] = {2.           masterRpcAddresses.map { masterAddress=>3.             registerMasterThreadPool.submit(newRunnable {4.               override def run(): Unit = {5.                 try {6.                   logInfo("Connecting to master" + masterAddress + "...")7.                   val masterEndpoint =rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)8.                   registerWithMaster(masterEndpoint)9.                 } catch {10.                 case ie: InterruptedException => //Cancelled11.                 case NonFatal(e)=> logWarning(s"Failed to connect to master $masterAddress", e)12.               }13.             }14.           })15.         }16.       }
 
tryRegisterAllMasters方法中,由于实际运行的时候有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint,masterEndpoint是一个RpcEndpointRef,通过registerWithMaster(masterEndpoint)进行注册。1.         private defregisterWithMaster(masterEndpoint: RpcEndpointRef): Unit = {2.           masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(3.             workerId, host, port, self, cores,memory, workerWebUiUrl))4.             .onComplete {5.               // This is a very fast action so we canuse "ThreadUtils.sameThread"6.               case Success(msg) =>7.                 Utils.tryLogNonFatalError {8.                   handleRegisterResponse(msg)9.                 }10.             case Failure(e) =>11.               logError(s"Cannot register withmaster: ${masterEndpoint.address}", e)12.               System.exit(1)13.           }(ThreadUtils.sameThread)14.       }
 
registerWithMaster方法中masterEndpoint.ask[RegisterWorkerResponse] (RegisterWorker(传进去的是RegisterWorker,RegisterWorker是一个case class,  包括 id、host、port、worker、cores、memory 等信息, 这里worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。
RegisterWorker源码如下:1.         case class RegisterWorker(2.             id: String,3.             host: String,4.             port: Int,5.             worker: RpcEndpointRef,6.             cores: Int,7.             memory: Int,8.             workerWebUiUrl: String)9.           extends DeployMessage {10.         Utils.checkHost(host,"Required hostname")11.         assert (port > 0)12.       }
 
Worker通过registerWithMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求以后,进行相应的处理:
Master.scala 的receiveAndReply源码如下:1.           override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {2.           case RegisterWorker(3.               id, workerHost, workerPort, workerRef,cores, memory, workerWebUiUrl) =>4.             logInfo("Registering worker %s:%dwith %d cores, %s RAM".format(5.               workerHost, workerPort, cores,Utils.megabytesToString(memory)))6.             if (state == RecoveryState.STANDBY) {7.               context.reply(MasterInStandby)8.             } else if (idToWorker.contains(id)) {9.               context.reply(RegisterWorkerFailed("Duplicateworker ID"))10.           } else {11.             val worker = newWorkerInfo(id, workerHost, workerPort, cores, memory,12.               workerRef,workerWebUiUrl)13.             if(registerWorker(worker)) {14.               persistenceEngine.addWorker(worker)15.               context.reply(RegisteredWorker(self,masterWebUiUrl))16.               schedule()17.             } else {18.               val workerAddress =worker.endpoint.address19.               logWarning("Worker registrationfailed. Attempted to re-register worker at same " +20.                 "address:" + workerAddress)21.               context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "22.                 + workerAddress))23.             }24.           }
 
RegisterWorker中:Master在接收到Worker注册的请求后,首先会判断一下当前的Master是否是Standby的模式,如果是的话就不处理;Master的idToWorker包含了所有已经注册的worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有的话此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表worker的字符描述,Value是WorkerInfo。1.           private val idToWorker = new HashMap[String,WorkerInfo]
 
WorkerInfo  包括 id、host、port 、cores、memory、endpoint 等内容:1.           private[spark] class WorkerInfo(2.           val id: String,3.           val host: String,4.           val port: Int,5.           val cores: Int,6.           val memory: Int,7.           val endpoint: RpcEndpointRef,8.           val webUiAddress: String)9.         extends Serializable {
 
Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker来执行具体的注册的过程,如果Worker的状态是否是DEAD的状态则直接过滤掉,对于UNKNOWN 的内容调用removeWorker进行清理(包括清理该
Worker下的Executors和Drivers)。其中UNKNOWN的情况:Master进行切换的时候,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常的状态。
 registerWorker的源码如下:1.            private def registerWorker(worker:WorkerInfo): Boolean = {2.           // There may be one or more refs to deadworkers on this same node (w/ different ID's),3.           // remove them.4.           workers.filter { w =>5.             (w.host == worker.host && w.port== worker.port) && (w.state == WorkerState.DEAD)6.           }.foreach { w =>7.             workers -= w8.           }9.        10.         val workerAddress =worker.endpoint.address11.         if(addressToWorker.contains(workerAddress)) {12.           val oldWorker =addressToWorker(workerAddress)13.           if (oldWorker.state ==WorkerState.UNKNOWN) {14.             // A workerregistering from UNKNOWN implies that the worker was restarted during recovery.15.             // The old worker mustthus be dead, so we will remove it and accept the new worker.16.             removeWorker(oldWorker)17.           } else {18.             logInfo("Attempted to re-registerworker at same address: " + workerAddress)19.             return false20.           }21.         }22.      23.         workers += worker24.         idToWorker(worker.id) =worker25.         addressToWorker(workerAddress) = worker26.         if (reverseProxy) {27.            webUi.addProxyTargets(worker.id,worker.webUiAddress)28.         }29.         true30.       }
 
在registerWorker方法中,Worker注册完成以后,把注册的Worker加入到Master的内存的数据结构中。1.       val workers = newHashSet[WorkerInfo]2.       private val idToWorker = newHashMap[String, WorkerInfo]3.         private val addressToWorker = newHashMap[RpcAddress, WorkerInfo]   4.       ……5.        6.         workers += worker7.           idToWorker(worker.id) = worker8.           addressToWorker(workerAddress) = worker
 
 回到Master.scala的receiveAndReply方法,Worker注册完成以后,调用  persistenceEngine.addWorker(worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。1.       …… 2.        if (registerWorker(worker)) {3.                 persistenceEngine.addWorker(worker)4.                 context.reply(RegisteredWorker(self,masterWebUiUrl))5.                 schedule()6.               } else {
 
PersistenceEngine.scala的addWorker方法1.             final def addWorker(worker: WorkerInfo):Unit = {2.           persist("worker_" + worker.id,worker)3.         }
ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下:1.          private[master] classZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)2.         extends PersistenceEngine3.       ......4.         override def persist(name: String, obj:Object): Unit = {5.           serializeIntoFile(WORKING_DIR +"/" + name, obj)6.         }7.       ......8.       private defserializeIntoFile(path: String, value: AnyRef) {9.           val serialized =serializer.newInstance().serialize(value)10.         val bytes = newArray[Byte](serialized.remaining())11.         serialized.get(bytes)12.         zk.create().withMode(CreateMode.PERSISTENT).forPath(path,bytes)13.       }
 
回到Master.scala的receiveAndReply方法, 注册的Worker数据持久化以后,进行schedule():1.       ……   2.        context.reply(RegisteredWorker(self,masterWebUiUrl))3.                 schedule()4.       ……
 
至此,Worker的注册已经完成。
同样的, Driver的注册过程:Driver提交Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎例如ZooKeeper把注册信息持久化起来,然后进行Schedule。
Application的注册过程:Application提交Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部StandaloneAppClient,StandaloneAppClient内部有ClientEndpoint,ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎例如ZooKeeper把注册信息持久化起来,然后进行Schedule。
 
二:Master对Driver和Executor状态变化的处理
        对Driver状态变化的处理, 如果Driver的各个状态是:DriverState.ERROR| DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED,将清理掉。其它情况报异常。1.          override def receive: PartialFunction[Any,Unit] = {2.       ......3.        case DriverStateChanged(driverId, state,exception) =>4.             state match {5.               case DriverState.ERROR |DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>6.                 removeDriver(driverId, state,exception)7.               case _ =>8.                 throw new Exception(s"Receivedunexpected state update for driver $driverId: $state")9.             }
 
removeDriver清理掉Driver以后,再次调用 schedule(),removeDriver的源码:1.            privatedef removeDriver(2.             driverId: String,3.             finalState: DriverState,4.             exception: Option[Exception]) {5.           drivers.find(d => d.id == driverId)match {6.             case Some(driver) =>7.               logInfo(s"Removing driver:$driverId")8.               drivers -= driver9.               if (completedDrivers.size >=RETAINED_DRIVERS) {10.               val toRemove =math.max(RETAINED_DRIVERS / 10, 1)11.               completedDrivers.trimStart(toRemove)12.             }13.             completedDrivers +=driver14.             persistenceEngine.removeDriver(driver)15.             driver.state =finalState16.             driver.exception =exception17.             driver.worker.foreach(w =>w.removeDriver(driver))18.             schedule()19.           case None =>20.             logWarning(s"Asked to removeunknown driver: $driverId")21.         }22.       }23.     }
 
对Executor状态变化的处理,ExecutorStateChanged的源码如下:1.        override def receive: PartialFunction[Any,Unit] = {2.       ......1.       caseExecutorStateChanged(appId, execId, state, message, exitStatus) =>2.             val execOption =idToApp.get(appId).flatMap(app => app.executors.get(execId))3.             execOption match {4.               case Some(exec) =>5.                 val appInfo = idToApp(appId)6.                 val oldState = exec.state7.                 exec.state = state8.        9.                 if (state == ExecutorState.RUNNING) {10.                 assert(oldState ==ExecutorState.LAUNCHING,11.                   s"executor$execId state transfer from $oldState to RUNNING is illegal")12.                 appInfo.resetRetryCount()13.               }14.      15.               exec.application.driver.send(ExecutorUpdated(execId,state, message, exitStatus, false))16.      17.               if(ExecutorState.isFinished(state)) {18.                 // Remove thisexecutor from the worker and app19.                 logInfo(s"Removing executor${exec.fullId} because it is $state")20.                 // If anapplication has already finished, preserve its21.                 // state todisplay its information properly on the UI22.                 if(!appInfo.isFinished) {23.                   appInfo.removeExecutor(exec)24.                 }25.                 exec.worker.removeExecutor(exec)26.      27.                 val normalExit =exitStatus == Some(0)28.                 // Only retrycertain number of times so we don't go into an infinite loop.29.                 // Important note:this code path is not exercised by tests, so be very careful when30.                 // changing this`if` condition.31.                 if (!normalExit32.                     &&appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES33.                     && MAX_EXECUTOR_RETRIES >= 0) {// < 0 disables this application-killing path34.                   val execs =appInfo.executors.values35.                   if(!execs.exists(_.state == ExecutorState.RUNNING)) {36.                     logError(s"Application${appInfo.desc.name} with ID ${appInfo.id} failed " +37.                       s"${appInfo.retryCount}times; removing it")38.                     removeApplication(appInfo,ApplicationState.FAILED)39.                   }40.                 }41.               }42.               schedule()43.             case None =>44.               logWarning(s"Got status update forunknown executor $appId/$execId")45.           }
 
Executor挂掉的时候系统会尝试一定次数的重启(最多重试10次重启)。1.            privateval MAX_EXECUTOR_RETRIES =conf.getInt("spark.deploy.maxExecutorRetries", 10)
 
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: