您的位置:首页 > 其它

Spark1.3从创建到提交:5)Executor启动源码分析

2017-01-02 09:20 507 查看
接着上一节资源分配成功后,无论是哪种策略都会启动executor

launchExecutor(usableWorkers(pos), exec)
下面为Master.launchExecutor方法
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//记录该worker使用的资源
worker.addExecutor(exec)
//mater给worker发送启动LaunchExecutor的消息
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//master给driver(AppClient.ClientActor)发送消息,通知它Exector已经启动
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
给Worker节点发送启动executor的消息,接收的处理逻辑在Worker.receiveWithLogging中

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
//..创建了2个目录,跳过,下面是核心
appDirectories(appId) = appLocalDirs
val manager = new ExecutorRunner(
appId, //应用程序ID
execId, //ExecutorID
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self, // context.self workerActor自身的ActorRef
workerId,
host, //worker的host
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl, //供executor和worker通信(同一台机器的2个进程)
conf,
appLocalDirs, ExecutorState.LOADING)
//ExecutorID->Executor
executors(appId + "/" + execId) = manager
//启动executor的java子进程
manager.start()
coresUsed += cores_
memoryUsed += memory_
//通知master,executor已经启动ok
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
}
此时,executor已经建立了和worker(通过akkaUrl)以及driver(通过appDesc中的appUiUrl)的通信机制,接下来会通过ExecutorRunner的start方法启动executor的java子进程,下面是ExecutorRunner.start()    (ps:Worker和CoarseGrainedExector虽然在同一机器上,但它们属于不同的进程)
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
为了提高速度,worker启动了一个线程来启动executor进程,具体看下fetchAndRunExecutor方法
def fetchAndRunExecutor() {
try {
//只保留核心代码 Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
process = builder.start()
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
}
}
上面使用CommandUtils这个工具来启动executor(使用jar命令运行参数传递过来的类org.apache.spark.executor.CoarseGrainedExecutorBackend),至此executor进程已经成功启动

再回到之前Master.launchExecutor方法

//master给driver(AppClient.ClientActor)发送消息,通知它Exector已经启动
exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
看下AppClient.ClientActor.receiveWithLogging方法

case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
master ! ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None)
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
最后给master发送了ExecutorStateChanged的信息,因为master负责管理Woker的资源

case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  SPARK Executor