Spark源码分析之Worker启动通信机制
2017-11-09 09:32
585 查看
Worker是spark的工作节点,主要负责接受Master指令,启动或者杀掉Executor,Driver等;汇报Driver或者Executor状态到Master;发送心跳请求到Master等等
Int webUiPort: web ui 端口
Int cores: 分配给该worker的CPU核数
Int coresUsed: 该worker使用的CPU核数
Int coresFree = cores - coresUsed 还剩余的CPU核数
Int memory: 分配给该worker的内存容量
Int memoryUsed:该worker使用的内存容量
Int memoryFree = memory - memoryUsed 还剩余的内存容量
Array[RpcAddress] masterRpcAddresses: master RpcAddress数组
String endpointName: worker的rpc终端名字
String workDirPath: 工作目录
forwordMessageScheduler: 一个后台调度线程,在指定的时间发送消息
cleanupThreadExecutor: 一个后台清理工作目录的线程
Option[RpcEndpointRef] master: master终端
String activeMasterUrl: 当前有效的master url
String activeMasterWebUiUrl: 当前有效的master web ui url
String workerWebUiUrl: worker的web ui url
String workerUri: worker的url
boolean registered: 该worker是否已经注册
boolean connected: 该worker是否连接到master
String workerId: worker的id
HashMap[String, DriverRunner] drivers: worker维护的一个所有driver id -> DriverRunner的映射
HashMap[String, ExecutorRunner] executors: worker维护的一个所有executor id -> ExecutorRunner的映射
LinkedHashMap[String, DriverRunner]finishedDrivers: worker维护的一个已经完成任务的driver id -> DriverRunner的映射
HashMap[String, Seq[String]] appDirectories: worker维护的一个application id -> app目录的映射
HashSet[String] finishedApps: 该worker已经完成工作的application
HEARTBEAT_MILLIS: 向Master发送心跳频率
INITIAL_REGISTRATION_RETRIES: 向master注册初始重试次数,默认是6次
TOTAL_REGISTRATION_RETRIES: 向master注册总的尝试次数
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS: 初始化的注册重试间隔
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS:延长的注册重试间隔
CLEANUP_ENABLED: 是否启用cleanup功能
CLEANUP_INTERVAL_MILLIS: cleanup时间间隔
APP_DATA_RETENTION_SECONDS: app数据保存时间长度
main(argStrings:Array[String]) {
Utils.initDaemon(log)
val conf
= new SparkConf
// 解析启动参数列表
val args=
new WorkerArguments(argStrings,conf)
// 启动Rpc通信环境和通信终端
val rpcEnv=
startRpcEnvAndEndpoint(args.host,args.port,args.webUiPort,args.cores,
args.memory,args.masters,args.workDir,conf
= conf)
rpcEnv.awaitTermination()
}
# 创建Web UI,并且绑定Web UI
# 向Master注册
2.6registerWithMaster(masterEndpoint: RpcEndpointRef)
向master注册
2.7 handleRegisterResponse 处理回调函数的结果
一 重要属性
RpcEnv rpcEnv: 用于注册和维护RpcEndpoint和RpcEndpointRefInt webUiPort: web ui 端口
Int cores: 分配给该worker的CPU核数
Int coresUsed: 该worker使用的CPU核数
Int coresFree = cores - coresUsed 还剩余的CPU核数
Int memory: 分配给该worker的内存容量
Int memoryUsed:该worker使用的内存容量
Int memoryFree = memory - memoryUsed 还剩余的内存容量
Array[RpcAddress] masterRpcAddresses: master RpcAddress数组
String endpointName: worker的rpc终端名字
String workDirPath: 工作目录
forwordMessageScheduler: 一个后台调度线程,在指定的时间发送消息
cleanupThreadExecutor: 一个后台清理工作目录的线程
Option[RpcEndpointRef] master: master终端
String activeMasterUrl: 当前有效的master url
String activeMasterWebUiUrl: 当前有效的master web ui url
String workerWebUiUrl: worker的web ui url
String workerUri: worker的url
boolean registered: 该worker是否已经注册
boolean connected: 该worker是否连接到master
String workerId: worker的id
HashMap[String, DriverRunner] drivers: worker维护的一个所有driver id -> DriverRunner的映射
HashMap[String, ExecutorRunner] executors: worker维护的一个所有executor id -> ExecutorRunner的映射
LinkedHashMap[String, DriverRunner]finishedDrivers: worker维护的一个已经完成任务的driver id -> DriverRunner的映射
HashMap[String, Seq[String]] appDirectories: worker维护的一个application id -> app目录的映射
HashSet[String] finishedApps: 该worker已经完成工作的application
HEARTBEAT_MILLIS: 向Master发送心跳频率
INITIAL_REGISTRATION_RETRIES: 向master注册初始重试次数,默认是6次
TOTAL_REGISTRATION_RETRIES: 向master注册总的尝试次数
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS: 初始化的注册重试间隔
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS:延长的注册重试间隔
CLEANUP_ENABLED: 是否启用cleanup功能
CLEANUP_INTERVAL_MILLIS: cleanup时间间隔
APP_DATA_RETENTION_SECONDS: app数据保存时间长度
二 核心方法
2.1 main方法
defmain(argStrings:Array[String]) {
Utils.initDaemon(log)
val conf
= new SparkConf
// 解析启动参数列表
val args=
new WorkerArguments(argStrings,conf)
// 启动Rpc通信环境和通信终端
val rpcEnv=
startRpcEnvAndEndpoint(args.host,args.port,args.webUiPort,args.cores,
args.memory,args.masters,args.workDir,conf
= conf)
rpcEnv.awaitTermination()
}
2.2onstart 启动worker
# 创建工作目录# 创建Web UI,并且绑定Web UI
# 向Master注册
override def onStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) // 创建工作目录 createWorkDir() // 如果ExternalShuffleService 启用了,就调用它的start方法 shuffleService.startIfEnabled() // 创建 worker的web ui webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}" // 向Master注册 registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() // Attach the worker metrics servlet handler to the web ui after the metrics system is started. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) }
2.3createWorkDir 创建工作目录
/** * 创建worker的存放一些数据的目录 * app-20170613113959-0000 * app-20170613114457-0001 * app-20170613114710-0002 */ private def createWorkDir() { // 获取工作目录 workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { // 创建目录 workDir.mkdirs() // 如果目录不存在或者不是目录,则退出 if ( !workDir.exists() || !workDir.isDirectory) { logError("Failed to create work directory " + workDir) System.exit(1) } assert (workDir.isDirectory) } catch { case e: Exception => logError("Failed to create work directory " + workDir, e) System.exit(1) } }
2.4 registerWithMaster():向master注册
private def registerWithMaster() { registrationRetryTimer match { //如果没有,说明还没有注册,然后会开始去注册 case None => // 初始注册状态为false registered = false // 尝试向所有master注册 registerMasterFutures = tryRegisterAllMasters() // 连接尝试次数设为0 connectionAttemptCount = 0 // 后台线程定时调度,发送ReregisterWithMaster请求,如果之前已经注册成功,则下一次来注册,则啥也不做 registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) // 如果已经有 registrationRetryTimer,就啥都不做 case Some(_) => } }
2.5tryRegisterAllMasters 尝试向所有的集群内所有master注册
private def tryRegisterAllMasters(): Array[JFuture[_]] = { masterRpcAddresses.map { masterAddress => registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") // 构造master RpcEndpoint,用于向master发送消息或者请求 val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) // 向指定的master注册 registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } } }) } }
2.6registerWithMaster(masterEndpoint: RpcEndpointRef)
向master注册private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { // 向master发送RegisterWorker请求 masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( workerId, host, port, self, cores, memory, workerWebUiUrl)) .onComplete { // 回调成功,则调用handleRegisterResponse case Success(msg) => Utils.tryLogNonFatalError { handleRegisterResponse(msg) } // 回调失败,则退出 case Failure(e) => logError(s"Cannot register with master: ${masterEndpoint.address}", e) System.exit(1) }(ThreadUtils.sameThread) }
2.7 handleRegisterResponse 处理回调函数的结果
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { msg match { // 如果是RegisteredWorker请求,表示已经注册成功 case RegisteredWorker(masterRef, masterWebUiUrl) => logInfo("Successfully registered with master " + masterRef.address.toSparkURL) registered = true // 更新registered状态 changeMaster(masterRef, masterWebUiUrl) // 后台线程开始定时调度向master发送心跳的线程 forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(SendHeartbeat) } }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) // 如果启用了cleanup功能,后台线程开始定时调度发送WorkDirCleanup指令,清理目录 if (CLEANUP_ENABLED) { logInfo( s"Worker cleanup enabled; old application directories will be deleted in: $workDir") forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(WorkDirCleanup) } }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) } // 根据worker所持有的executor构造ExecutorDescription对象,描述该executor val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) } // 向master发送WorkerLatestState请求,获取worker最近状态 masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) // 如果是RegisterWorkerFailed请求,表示注册失败 case RegisterWorkerFailed(message) => // 如果还没有注册成功,则退出 if (!registered) { logError("Worker registration failed: " + message) System.exit(1) } // 如果是MasterInStandby请求,则啥也不做 case MasterInStandby => // Ignore. Master not yet ready. } }
2.8receive 接受消息,但是不需要返回结果
override def receive: PartialFunction[Any, Unit] = synchronized { // 如果接收的是SendHeartbeat消息,表示需要向master发送心跳请求 case SendHeartbeat => if (connected) { sendToMaster(Heartbeat(workerId, self)) } // 如果接收的是WorkDirCleanup消息,表示需要清理工作目录 case WorkDirCleanup => // 首先通过executors获取它所对应的app id的集合 val appIds = executors.values.map(_.appId).toSet // 获取那些已经完毕的application目录,并且递归删除之,将处理结果封装在Future对象里 val cleanupFuture = concurrent.Future { // 获取该目录下所有文件 val appDirs = workDir.listFiles() if (appDirs == null) { throw new IOException("ERROR: Failed to list files in " + appDirs) } // appDirs.filter { dir => val appIdFromDir = dir.getName // 获取目录名字 val isAppStillRunning = appIds.contains(appIdFromDir) // 判断这个目录所在的application是否正在运行 // 如果是目录,且不再包含任何新文件,则递归删除该目录 dir.isDirectory && !isAppStillRunning && !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) }.foreach { dir => logInfo(s"Removing directory: ${dir.getPath}") Utils.deleteRecursively(dir) } }(cleanupThreadExecutor) cleanupFuture.onFailure { case e: Throwable => logError("App dir cleanup failed: " + e.getMessage, e) }(cleanupThreadExecutor) // 如果接收MasterChanged消息,表示master已经发生变化了 case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) // 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册 changeMaster(masterRef, masterWebUiUrl) // 创建当前节点executors的简单描述对象ExecutorDescription val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) // 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作 masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)) // 如果接收到ReconnectWorker消息,表示之前worker断开,需要重新连接 case ReconnectWorker(masterUrl) => logInfo(s"Master with url $masterUrl requested this worker to reconnect.") // 断开之后,需要重新向master注册 registerWithMaster() // 如果接收到LaunchExecutor消息,表示需要发起executor case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => // 检测master是否有效 if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // 创建executor目录,appId/execId val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // 获取application本地目录,如果没有则创建,最后这些目录在应用程序运行完毕之后删除掉 val appLocalDirs = appDirectories.getOrElse(appId, Utils.getOrCreateLocalRootDirs(conf).map { dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq) appDirectories(appId) = appLocalDirs // 创建ExecutorRunner对象,主要负责管理executor进程的执行 val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) // worker维护的executor id->ExecutorRunner 映射添加这个新建的 ExecutorRunner executors(appId + "/" + execId) = manager // 启动这个ExecutorRunner manager.start() // 重新计算已经使用的cpu核数和内存容量 coresUsed += cores_ memoryUsed += memory_ // 向master发送ExecutorStateChanged消息 sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } } // 如果接收ExecutorStateChanged消息,表示executor状态发生改变 case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) => handleExecutorStateChanged(executorStateChanged) // 如果接收到KillExecutor消息,表示需要杀掉这个executor进程 case KillExecutor(masterUrl, appId, execId) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to kill executor " + execId) } else { val fullId = appId + "/" + execId executors.get(fullId) match { case Some(executor) => logInfo("Asked to kill executor " + fullId) executor.kill() case None => logInfo("Asked to kill unknown executor " + fullId) } } // 如果接收到LaunchDriver消息,表示启动Driver case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") // 创建DriverRunner,分配资源 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) //加入到drivers drivers(driverId) = driver // 启动这个driver driver.start() // 重新计算当前worker使用的内存和cpu coresUsed += driverDesc.cores memoryUsed += driverDesc.mem // 如果接收到KillDriver消息,表示小杀掉这个driver case KillDriver(driverId) => logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } // 如果接收到DriverStateChanged消息,表示driver状态改变 case driverStateChanged @ DriverStateChanged(driverId, state, exception) => handleDriverStateChanged(driverStateChanged) // 如果接收到ReregisterWithMaster消息,表示需要重新向master注册 case ReregisterWithMaster => reregisterWithMaster() // 如果接收到ApplicationFinished消息,表示application已经运行完毕 case ApplicationFinished(id) => finishedApps += id // 这时候可能需要清理application目录了 maybeCleanupApplication(id) }
2.9receiveAndReply 接收消息吗,返回结果
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // 如果接收到RequestWorkerState消息,则需要返回worker的状态信息 case RequestWorkerState => context.reply(WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl)) }
2.10 changeMaster
获取新的master的url和master,取消之前那些重新注册尝试,因为已经发现新的masterprivate def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) { // 获取新的master的url和master activeMasterUrl = masterRef.address.toSparkURL activeMasterWebUiUrl = uiUrl master = Some(masterRef) connected = true // 连接状态置为true if (conf.getBoolean("spark.ui.reverseProxy", false)) { logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId") } // 取消之前那些重新注册尝试,因为已经发现新的master cancelLastRegistrationRetry() }
2.11handleExecutorStateChanged 处理executor状态改变
private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged): Unit = { // 首先向master发送ExecutorStateChanged消息 sendToMaster(executorStateChanged) // 获取Executor状态 val state = executorStateChanged.state // 如果是完成状态 if (ExecutorState.isFinished(state)) { // 获取该Executor对应的application id val appId = executorStateChanged.appId // appId/execId val fullId = appId + "/" + executorStateChanged.execId val message = executorStateChanged.message val exitStatus = executorStateChanged.exitStatus // 从worker的维护的executor id和ExecuteRunner中获取ExecuteRunner executors.get(fullId) match { case Some(executor) => logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) // 首先将这个ExecuteRunner移除executors映射集合 executors -= fullId // 将它移动到处于完成状态的映射集合finishedExecutors finishedExecutors(fullId) = executor // 如果需要,则删除一些完成的executors trimFinishedExecutorsIfNecessary() // 释放CPU和内存 coresUsed -= executor.cores memoryUsed -= executor.memory case None => logInfo("Unknown Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) } // 这时候可能会清理application工作目录 maybeCleanupApplication(appId) } }
2.12handleDriverStateChanged 处理driver状态改变
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { // 获取driver id val driverId = driverStateChanged.driverId val exception = driverStateChanged.exception // 获取driver的状态 val state = driverStateChanged.state state match { case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") case _ => logDebug(s"Driver $driverId changed state to $state") } // 向master发送DriverStateChanged消息 sendToMaster(driverStateChanged) // 从drivers集合移除,并把它添加到处于完成状态的集合finishedDrivers val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver // 如果需要,则删除一些完成的executors trimFinishedDriversIfNecessary() // 释放CPU和内存 memoryUsed -= driver.driverDesc.mem coresUsed -= driver.driverDesc.cores }
2.13reregisterWithMaster 重新注册
有时候早遇到网络异常或者master失败,则需要重新向master注册,如果注册超过指定的次数,则worker退出private def reregisterWithMaster(): Unit = { Utils.tryOrExit { // 初始化尝试连接次数加1 connectionAttemptCount += 1 // 如果之前已经注册成功的,则取消最近的重新尝试注册 if (registered) { cancelLastRegistrationRetry() } // 如果尝试连接次数小于总的注册尝试次数,则进行重新注册,否则退出 else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") // 向有效的master重新注册,如果没有,这就意味着worker仍然处于引导状态,还没有和master建立连接 // 在此种情况下,我们应该向所有的master重新注册 master match { // 如果master存在,但是registered又是false,表示我们失去了master的连接,所以我们需要重新创建 // Master RpcEndpoint case Some(masterRef) => if (registerMasterFutures != null) { registerMasterFutures.foreach(_.cancel(true)) } val masterAddress = masterRef.address registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") // 重新创建masterEndpoint val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) // 然后再给新的master发送注册消息去注册 registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) } } })) // 如果没有则向所有master注册,否则容易出现重复的worker错误 case None => if (registerMasterFutures != null) { registerMasterFutures.foreach(_.cancel(true)) } registerMasterFutures = tryRegisterAllMasters() } // 如果重新注册次数超过初始的阀值,那么就会使用一个更大间隔的阀值 if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { registrationRetryTimer.foreach(_.cancel(true)) registrationRetryTimer = Some( forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) } }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) } } else { logError("All masters are unresponsive! Giving up.") System.exit(1) } } }
相关文章推荐
- Spark通信机制:1)Spark1.3 vs Spark1.6源码分析
- spark源码分析Master与Worker启动流程篇
- Spark集群启动之Master、Worker启动流程源码分析
- Spark的Master和Worker集群启动的源码分析
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- Spark源码学习(二)---Master和Worker的启动以及Actor通信流程
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- Standalone模式下Spark 中通信机制的源码分析
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- spark 1.6.0 core源码分析4 worker启动流程
- spark源码学习(三)---worker源码分析-worker启动driver、executor分析
- Spark源码分析之worker节点启动driver和executor
- Tomcat源码分析(七)--单一启动/关闭机制(生命周期)
- spark源码学习(二)---Master源码分析(2)-master内组件状态改变机制
- Nginx源码分析--master和worker进程间的通信
- Spark技术内幕:Client,Master和Worker 通信源码解析
- 基于TCP网络通信的自动升级程序源码分析-启动升级文件下载程序
- nginx源码分析1———进程间的通信机制二(自旋锁)
- Binder通信机制源码分析
- Spark技术内幕:Client,Master和Worker 通信源码解析