Spark1.3从创建到提交:5)Executor启动源码分析
2017-01-02 09:20
507 查看
接着上一节资源分配成功后,无论是哪种策略都会启动executor
再回到之前Master.launchExecutor方法
launchExecutor(usableWorkers(pos), exec)下面为Master.launchExecutor方法
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) //记录该worker使用的资源 worker.addExecutor(exec) //mater给worker发送启动LaunchExecutor的消息 worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) //master给driver(AppClient.ClientActor)发送消息,通知它Exector已经启动 exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }给Worker节点发送启动executor的消息,接收的处理逻辑在Worker.receiveWithLogging中
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { //..创建了2个目录,跳过,下面是核心 appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, //应用程序ID execId, //ExecutorID appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, // context.self workerActor自身的ActorRef workerId, host, //worker的host webUi.boundPort, publicAddress, sparkHome, executorDir, akkaUrl, //供executor和worker通信(同一台机器的2个进程) conf, appLocalDirs, ExecutorState.LOADING) //ExecutorID->Executor executors(appId + "/" + execId) = manager //启动executor的java子进程 manager.start() coresUsed += cores_ memoryUsed += memory_ //通知master,executor已经启动ok master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } }此时,executor已经建立了和worker(通过akkaUrl)以及driver(通过appDesc中的appUiUrl)的通信机制,接下来会通过ExecutorRunner的start方法启动executor的java子进程,下面是ExecutorRunner.start() (ps:Worker和CoarseGrainedExector虽然在同一机器上,但它们属于不同的进程)
def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } } workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { killProcess(Some("Worker shutting down")) } } Runtime.getRuntime.addShutdownHook(shutdownHook) }为了提高速度,worker启动了一个线程来启动executor进程,具体看下fetchAndRunExecutor方法
def fetchAndRunExecutor() { try { //只保留核心代码 Launch the process val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() process = builder.start() worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)) } }上面使用CommandUtils这个工具来启动executor(使用jar命令运行参数传递过来的类org.apache.spark.executor.CoarseGrainedExecutorBackend),至此executor进程已经成功启动
再回到之前Master.launchExecutor方法
//master给driver(AppClient.ClientActor)发送消息,通知它Exector已经启动 exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)看下AppClient.ClientActor.receiveWithLogging方法
case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) master ! ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None) listener.executorAdded(fullId, workerId, hostPort, cores, memory)最后给master发送了ExecutorStateChanged的信息,因为master负责管理Woker的资源
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { val appInfo = idToApp(appId) exec.state = state if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) val normalExit = exitStatus == Some(0) // Only retry certain number of times so we don't go into an infinite loop. if (!normalExit) { if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { schedule() } else { val execs = appInfo.executors.values if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } } } case None => logWarning(s"Got status update for unknown executor $appId/$execId") } }
相关文章推荐
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- spark源码分析之Executor启动与任务提交篇
- Spark1.3从创建到提交:4)资源分配源码分析
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
- Spark1.3从创建到提交:10)任务提交源码分析
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- Spark源码分析之worker节点启动driver和executor
- spark core源码分析1 集群启动及任务提交过程
- spark源码学习(三)---worker源码分析-worker启动driver、executor分析
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- SparkSubmit 提交作业源码流程粗略概述(含application中 driver、client、 executor的创建)
- spark源码学习(八)--- executor启动task分析
- worker启动executor源码分析-executor.clj
- spark core源码分析7 Executor的运行
- Spark源码分析之Job提交运行总流程概述