您的位置:首页 > 其它

spark源码解析-Master资源调度schedule()

2019-05-06 11:44 337 查看

主流程

** 每当driver,worker,application注册和状态改变的时候都会调用资源调度方法**

private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}

val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { //注意只有yarn-cluster才会注册driver到waitingDrivers

var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
//如果work的空闲内存大于driver需要的内存,work的空闲core数大于driver需要的core数
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//启动driver
launchDriver(worker, driver)
//从waitingDrivers 移除driver
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}

启动driver

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)//关联上work和driver,主要是为了重新计算使用的内存和cpu
driver.worker = Some(worker)//同理
//向对应的worker(yarn-cluster下其实叫nodemanager)发送rpc请求,执行LaunchDriver方法。
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}

切换到worker类

case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")

//构建DriverRunner对象
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
//执行driverRunner的start方法
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem

在这个方法中,单起一个线程使用命令行方式执行driver

private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}

//准备好jar包,启动driver
val exitCode = prepareAndRunDriver()

finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}

worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}

这样driver就启动了。
接着分析schedule()的下一个主要代码startExecutorsOnWorkers(),这是spark分配资源的核心代码。方法的最后一个参数spreadOutApps,影响了资源分配的方式。如果spreadOutApps是true,那么尽可能的在多个worker上平均分配executor.如果是false,那么先在一个worker上尽可能多的分配executor

private def startExecutorsOnWorkers(): Unit = {

for (app <- waitingApps) {  //遍历applicationinfo
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)

if (app.coresLeft >= coresPerExecutor) {//还未调度core的application
//状态alive的worker,worker的剩余内存大于app需要的内存,剩余core数大于app设置的core数,按剩余core数量倒叙排列
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
//获取分配的cores
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
//在worker上启动executor
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}

获取分配的cores

private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor//每个exectuor分配多少核数
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)//没有配置,则exe最少1个核
val oneExecutorPerWorker = coresPerExecutor.isEmpty//是否每个Worker分配一个Executor
val memoryPerExecutor = app.desc.memoryPerExecutorMB//每个exe分配多少MB内存
val numUsable = usableWorkers.length//可分配资源的Worker数,
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//app还需多少core资源和可分配资源Worker所有可分配核数之和的最小值

//判断是否能够执行 Executor
def canLaunchExecutor(pos: Int): Boolean = {
//如果app还需要核数大于每个exe最少核数,也就是需在分配executor
val keepScheduling = coresToAssign >= minCoresPerExecutor
//当前Worker总共可分配的核数减去已经分配的核数大于分配一个exe的最小个数,说明可以分配
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
//如果并不是worker只能分配一个executor,也就是没有指定executor核数的情况,或者当前worker针对
//当前app此次schedule还没有分配一个Executor,则可以分配新的executor
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
//  计算要分配的内存
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
//如果worker free memory减去已经分配的memory大于一个exe需要的memory,则内存足够
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
keepScheduling && enoughCores
}
}

var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)//可以分配资源的Worker
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>//遍历可以分配资源的Worker
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
// 可以分配的core数 减去 当前worker要分配的core数
coresToAssign -= minCoresPerExecutor
// 已经分配的core数 加上  当前worker要分配的core数
assignedCores(pos) += minCoresPerExecutor
// 移动指针到下一个Executor
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
//如果spreadOutApps是true,那么里层的while循环中断,去下一个符合条件的worker上分配资源
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}

在worker上启动executor

private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
//可分配的核数整除每个executor需要的核数,得到需要的executor核
1f470
数
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec) //发送rpc请求给worker
app.state = ApplicationState.RUNNING
}
}

发送rpc请求给worker

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐