您的位置:首页 > 其它

[spark] 从spark-submit开始解析整个任务调度流程

2017-11-01 21:59 621 查看
本文在spark2.1以Standalone Cluster模式下解析

概述

spark应用程序可以以Client模式和Cluster启动,区别在于Client模式下的Driver是在执行spark-submit命令节点上启动的,而Cluster模式下是Master随机选择的一台Worker通过DriverWrapper来启动Driver的。

大概流程为:

通过spark-submit提交会调用SparkSubmit类,SparkSubmit类里通过反射调用Client,Client与Master通信来SubmitDriver,收到成功回复后退出JVM(SparkSubmit进程退出)。

Master收到SubmitDriver后会随机选择一台能满足driver资源需求的Worker,然后与对应Worker通信发送启动driver的消息。Worker收到消息后根据driver的信息等来拼接成linux命令来启动DriverWrapper,在该类里面再启动driver,最后将Driver执行状态返回给Master。

driver启动后接下来就是注册APP,在SparkContext启动过程中会通过创建AppClient并与Master通信要求注册application。

Master收到消息后会去调度执行这个application,通过调度算法获取该application需要在哪些Worker上启动executor,接着与对应的Worker通信发送启动Executor的消息。

Worker 收到消息后通过拼接linux命令,启动了CoarseGrainedExecutorBackend进程,接着向Driver通信进行Executor的注册,成功注册后会在CoarseGrainedExecutorBackend中创建Executor对象。

接着就是job的执行了,可以参看前面的文章……

Submit Driver

通过shell命令spark-submit提交一个自己编写的application,最终实际是通过java -cp调用的类是:

org.apache.spark.deploy.SparkSubmit


在该类的main方法中,在Cluster模式下不使用Rest,会通过反射调用Client类:

org.apache.spark.deploy.Client


在Client类的main方法中会获得与Master通信的EndpointRef,并且创建一个名为Client的ClientEndpoint,在生命周期的onStart中会创建一个Driver的描述信息对象DriverDescription,其中包括了最终需要启动Driver的mainClass:

org.apache.spark.deploy.worker.DriverWrapper


接着向Master发送一个RequestSubmitDriver消息,Master收到后将DriverInfo持久化到存储系统,然后通过schedule()去调度,接着会向Client返回一个SubmitDriverResponse消息,Client收到成功提交成功消息后会再次向Master发送RequestDriverStatus消息询问driver的状态,若能收到Master端存在该driver的回复消息DriverStatusResponse则退出JVM(SparkSubmit进程退出)。

流程如图:



Master LaunchDriver

前面提到Master收到提交Driver的消息后会调用schedule()方法:

private def schedule(): Unit = {
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}


该方法会先打乱Worker防止Driver集中在一台Worker上,当Worker的资源满足driver所需要的资源,则会调用launchDriver方法。

在launchDriver方法里会向对应的Worker发送一个LaunchDriver消息,该Worker接收到消息后通过driver的各种描述信息创建一个DriverRunner,然后调用其start方法。

start方法中将driver的参数组织成Linux命令,通过java -cp来运行上面提到的DriverWrapper类来启动Driver,而不是直接启动,这是为了Driver程序和启动Driver的Worker程序共命运(源码注释中称为share fate),即如果此Worker挂了,对应的Driver也会停止。

最后将Driver的执行状态返回给Master。

流程如图:



Register APP

Driver起来后当然会涉及到APP向Master的注册,在创建SparkContext的时候,会创建SchedulerBackend和TaskScheduler:

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)


接着调用了TaskScheduler(TaskSchedulerImpl)的start方法,start方法里面又调用了SchedulerBackend(standalone模式下是StandaloneSchedulerBackend)的start方法:

override def start() {
super.start()
...
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
...
}


super.start()中创建了driverEndpoint。先根据application的参数创建了ApplicationDescription,又创建了StandaloneAppClient并调用其start方法,在start方法中创建了名为AppClient的Endpoint,在其生命周期的onStart方法中向Master发送了RegisterApplication消息进行注册app。

Master收到RegisterApplication消息后,创建描述application的ApplicationInfo,并持久化到存储系统,随后向AppClient返回RegisteredApplication的消息,然后通过schedule()去调度application。

流程如图:



Launch Executor

在上文Master LaunchDriver时解析了该方法的前部分,前部分说明了是如何将Driver调度到Worker上启动的。

private def schedule(): Unit = {
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}


现在来说说后部分 startExecutorsOnWorkers()是怎么在Worker上启动Executor的:

private def startExecutorsOnWorkers(): Unit = {
// 遍历所有等待调度的application,顺序为FIFO
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// 过滤出资源能满足APP对于每一个Executor需求的Worker
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
// 对Executor的调度(为每个Worker分配的core数)
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// 根据前面调度好的,在对应Worker上启动Executor
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}


先过滤出能满足application对于一个Executor资源要求的Worker,然后对Executor进行调度,策略有两种:

使用spreadOutApps算法分配资源,即Executor分布在尽可能多的Worker节点上

Executor聚集在某些Worker节点上

启用spreadOutApps算法通过参数spark.deploy.spreadOut配置,默认为true,scheduleExecutorsOnWorkers方法返回的就是每个Worker能分配到的core数。

然后通过allocateWorkerResourceToExecutors去计算该Worker上需要启动的Executor:

private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// 计算在该Worker上启动的Executor数,总cores / 一个Executor所需
// 若没有指定一个Executor所需core数,则将分到的core数都给一个Executor
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}


通过计算得到该Worker需要启动的Executor数,然后调用launchExecutor方法通过与对应的Worker通信来发送LaunchExecutor消息。

流程如图:



对应的Worker收到消息后将收到的信息封装成ExecutorRunner对象,并调用其start方法:

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
...
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
...


在manager的start方法中调用了fetchAndRunExecutor方法:

private def fetchAndRunExecutor() {
try {
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
...
process = builder.start()
...
}


这里和启动Driver启动的方式类似,通过收到的信息拼接成Linux命令,通过Java -cp 来启动CoarseGrainedExecutorBackend进程。

流程如图:



在CoarseGrainedExecutorBackend的main方法里创建了名为Executor的Endpoint,在其生命周期的onStart()方法里向Driver发送了RegisterExecutor消息。

Driver收到消息后根据Executor信息创建了ExecutorData对象,并加入到executorDataMap集合中,然后返回RegisteredExecutor消息给CoarseGrainedExecutorBackend。

CoarseGrainedExecutorBackend收到RegisteredExecutor后:

case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}


便创建了一个Executor对象,此对象将执行Driver分配的Task。

流程如图:



接着就是通过DAGScheduler、TaskScheduler等对Stage的划分,Task的调度等执行,最终将Task结果返回到Driver,具体可看前面的文章:

DAGScheduler划分stage源码解析

DAGScheduler 提交stage源码解析

TaskScheduler 任务提交与调度源码解析

Task执行流程

Task成功执行的结果处理

参考

Spark 任务调度概述
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: