spark源码解析-Master资源调度schedule()
2019-05-06 11:44
337 查看
主流程
** 每当driver,worker,application注册和状态改变的时候都会调用资源调度方法**
private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { //注意只有yarn-cluster才会注册driver到waitingDrivers var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 //如果work的空闲内存大于driver需要的内存,work的空闲core数大于driver需要的core数 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //启动driver launchDriver(worker, driver) //从waitingDrivers 移除driver waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() }
启动driver
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver)//关联上work和driver,主要是为了重新计算使用的内存和cpu driver.worker = Some(worker)//同理 //向对应的worker(yarn-cluster下其实叫nodemanager)发送rpc请求,执行LaunchDriver方法。 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
切换到worker类
case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") //构建DriverRunner对象 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver //执行driverRunner的start方法 driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem
在这个方法中,单起一个线程使用命令行方式执行driver
private[worker] def start() = { new Thread("DriverRunner for " + driverId) { override def run() { var shutdownHook: AnyRef = null try { shutdownHook = ShutdownHookManager.addShutdownHook { () => logInfo(s"Worker shutting down, killing driver $driverId") kill() } //准备好jar包,启动driver val exitCode = prepareAndRunDriver() finalState = if (exitCode == 0) { Some(DriverState.FINISHED) } else if (killed) { Some(DriverState.KILLED) } else { Some(DriverState.FAILED) } } catch { case e: Exception => kill() finalState = Some(DriverState.ERROR) finalException = Some(e) } finally { if (shutdownHook != null) { ShutdownHookManager.removeShutdownHook(shutdownHook) } } worker.send(DriverStateChanged(driverId, finalState.get, finalException)) } }.start() }
这样driver就启动了。
接着分析schedule()的下一个主要代码startExecutorsOnWorkers(),这是spark分配资源的核心代码。方法的最后一个参数spreadOutApps,影响了资源分配的方式。如果spreadOutApps是true,那么尽可能的在多个worker上平均分配executor.如果是false,那么先在一个worker上尽可能多的分配executor
private def startExecutorsOnWorkers(): Unit = { for (app <- waitingApps) { //遍历applicationinfo val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) if (app.coresLeft >= coresPerExecutor) {//还未调度core的application //状态alive的worker,worker的剩余内存大于app需要的内存,剩余core数大于app设置的core数,按剩余core数量倒叙排列 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor) .sortBy(_.coresFree).reverse //获取分配的cores val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { //在worker上启动executor allocateWorkerResourceToExecutors( app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) } } } }
获取分配的cores
private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor//每个exectuor分配多少核数 val minCoresPerExecutor = coresPerExecutor.getOrElse(1)//没有配置,则exe最少1个核 val oneExecutorPerWorker = coresPerExecutor.isEmpty//是否每个Worker分配一个Executor val memoryPerExecutor = app.desc.memoryPerExecutorMB//每个exe分配多少MB内存 val numUsable = usableWorkers.length//可分配资源的Worker数, val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//app还需多少core资源和可分配资源Worker所有可分配核数之和的最小值 //判断是否能够执行 Executor def canLaunchExecutor(pos: Int): Boolean = { //如果app还需要核数大于每个exe最少核数,也就是需在分配executor val keepScheduling = coresToAssign >= minCoresPerExecutor //当前Worker总共可分配的核数减去已经分配的核数大于分配一个exe的最小个数,说明可以分配 val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor //如果并不是worker只能分配一个executor,也就是没有指定executor核数的情况,或者当前worker针对 //当前app此次schedule还没有分配一个Executor,则可以分配新的executor val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 if (launchingNewExecutor) { // 计算要分配的内存 val assignedMemory = assignedExecutors(pos) * memoryPerExecutor //如果worker free memory减去已经分配的memory大于一个exe需要的memory,则内存足够 val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { keepScheduling && enoughCores } } var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)//可以分配资源的Worker while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos =>//遍历可以分配资源的Worker var keepScheduling = true while (keepScheduling && canLaunchExecutor(pos)) { // 可以分配的core数 减去 当前worker要分配的core数 coresToAssign -= minCoresPerExecutor // 已经分配的core数 加上 当前worker要分配的core数 assignedCores(pos) += minCoresPerExecutor // 移动指针到下一个Executor if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 } //如果spreadOutApps是true,那么里层的while循环中断,去下一个符合条件的worker上分配资源 if (spreadOutApps) { keepScheduling = false } } } freeWorkers = freeWorkers.filter(canLaunchExecutor) } assignedCores }
在worker上启动executor
private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { //可分配的核数整除每个executor需要的核数,得到需要的executor核 1f470 数 val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) //发送rpc请求给worker app.state = ApplicationState.RUNNING } }
发送rpc请求给worker
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) exec.application.driver.send( ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) }
相关文章推荐
- Master原理剖析与源码分析:资源调度机制源码分析(schedule(),两种资源调度算法)
- 第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Spark源码分析之Master资源调度算法原理
- spark源码学习(十一):资源的调度Schedule
- spark源码分析之master资源调度schedule篇
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- SPARK的MAster资源调度原理(源码)分析
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
- Spark2.2源码之资源调度机制
- [spark] TaskScheduler 任务提交与调度源码解析
- Spark技术内幕:Client,Master和Worker 通信源码解析
- Spark技术内幕:Client,Master和Worker 通信源码解析
- 深入理解Spark 2.1 Core (六):资源调度的原理与源码分析
- spark学习-Master资源调度分配算法
- Spark2.x资源调度源码再析之大白话系列
- Spark-Core源码精读(2)、Master中的schedule详解
- spark-core_09: org.apache.spark.deploy.master.Master源码解析1
- Spark系列(七)Master中的资源调度
- Spark技术内幕:Client,Master和Worker 通信源码解析