spark源码分析之master资源调度schedule篇
2018-02-10 22:54
483 查看
原理解释:
1、首先判断master的状态不是alive的话直接返回,也就是说,standby master是不会进行application等资源的调度的
2、对取出workers中所有注册上来上的worker,进行过滤,必须是状态为alive的worker,调用rondom的shuffle方法进行随机的打乱(从第三个worker进行shuffle)
3、遍历活着的worker,启动driver,将driver加入都内存缓冲结构中,并将driver从等待的driver的队列中移除
4、在workers上启动executors,使用是默认SpreadOutApps算法
源码解释:
第一步:调用schedule()方法
源码位置:org.apache.spark.deploy.master.Master
第二步:调用第一步的shuffle方法
源码位置:scala.util.Random
第三步:调用第一步的launchDriver方法
源码位置:org.apache.spark.deploy.master.Master
第四步:调用第一步的startExecutorsOnWorkers方法
源码位置:org.apache.spark.deploy.master.Master
第五步:调用第四步的scheduleExecutorsOnWorkers方法
源码位置:org.apache.spark.deploy.master.Master
第六步:调用第四步的allocateWorkerResourceToExecutors方法
源码位置:org.apache.spark.deploy.master.Master
1、首先判断master的状态不是alive的话直接返回,也就是说,standby master是不会进行application等资源的调度的
2、对取出workers中所有注册上来上的worker,进行过滤,必须是状态为alive的worker,调用rondom的shuffle方法进行随机的打乱(从第三个worker进行shuffle)
3、遍历活着的worker,启动driver,将driver加入都内存缓冲结构中,并将driver从等待的driver的队列中移除
4、在workers上启动executors,使用是默认SpreadOutApps算法
源码解释:
第一步:调用schedule()方法
源码位置:org.apache.spark.deploy.master.Master
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ /** * 资源调度算法 */ private def schedule(): Unit = { // 首先判断master的状态不是alive的话直接返回 // 也就是说,standby master是不会进行application等资源的调度的 if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors // random shuffle的原理是对传入的集合的元素进行随机的打乱 // 这里是对取出workers中所有注册上来上的worker,进行过滤,必须是状态为alive的worker // 对状态为alive的worker,调用rondom的shuffle方法进行随机的打乱 // 意思就是从ArrayBuffer的最后一个元素开始到第三个元素,对于每个元素,都会取出该范围内的随机数, // 比如说buf.length为10,然后next会取0到10的一个随机数,然后就会把buf随机的一个位置和该数字进行交换 val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers // 遍历活着的worker for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { // 遍历等待的driver,只有yarn-cluster和standalone的cluster模式提交的时候,才会注册driver,其他方式都是在 // 本地启动driver,而不是来注册driver,更不可能让master来调度driver for (driver <- waitingDrivers) { // 判断当前的worker的空闲内存量大于等于driver需要的内存量和判断worker的空闲cpu数大于等于driver需要的cpu数量 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { // 启动driver launchDriver(worker, driver) // 将driver从等待的driver的队列中删除 waitingDrivers -= driver } } } // 在workers上启动和调度executor startExecutorsOnWorkers() }
第二步:调用第一步的shuffle方法
源码位置:scala.util.Random
/** Returns a new collection of the same type in a randomly chosen order. * * @return the shuffled collection */ def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = { val buf = new ArrayBuffer[T] ++= xs def swap(i1: Int, i2: Int) { val tmp = buf(i1) buf(i1) = buf(i2) buf(i2) = tmp } for (n <- buf.length to 2 by -1) { val k = nextInt(n) swap(n - 1, k) } (bf(xs) ++= buf).result }
第三步:调用第一步的launchDriver方法
源码位置:org.apache.spark.deploy.master.Master
/** * 在某一个worker上启动driver */ private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) // 将driver加入到worker内部的缓冲结构中 // 将worker中使用的内存和cpu的数量,都加上driver需要的内存和cpu的数量 worker.addDriver(driver) // 将worker加入到driver的内存缓冲结构中 driver.worker = Some(worker) // 调用worker的actor,给worker发送注册driver的信息 worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
第四步:调用第一步的startExecutorsOnWorkers方法
源码位置:org.apache.spark.deploy.master.Master
/** * Schedule and launch executors on workers */ /** * Application的调度机制,默认是SpreadOutApps算法,另外一种是非SpreadOutApps算法 */ private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. // 首先遍历waitingApps中ApplicationInfo,并且还需要判断程序中定义的使用cpu的数量-启动执行application上 // worker上的excutor所使用的的cpu的要大于0 for (app <- waitingApps if app.coresLeft > 0) { val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor // Filter out workers that don't have enough resources to launch an executor // 从workers中,过滤出worker的状态为alive的,按照cpu的数量进行倒序排序 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // SpreadOut算法,会将每一个application要启动的executor都平均分布到各个worker上去 // 比如说20个cpu core,分配到10个worker上,实际会循环两遍worker,每次循环, // 给每个worker分配一个 core,最后每个worker分配了2个core // 总体概括:平均分布 // 非SpreadOut算法,将每一个application,尽可能少的分配到worker上去 // 每个applicati 4000 on,都尽可能的分配到尽量少的worker上,比如说10个worker // 每个worker10个cpu,application要分配20个core,那么其实,只会分配到2个worker上 // 每个worker都占满10个core,其余的app,就只能分配到下一个worker上 // 总体概括:尽可能资源大的分配 // Now that we've decided how many cores to allocate on each worker, let's allocate them // 给每个worker分配完application要求的cpu core之后,遍历worker,只要判断之前给这个worker分配到了core for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { // 就在worker上启动executor allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }
第五步:调用第四步的scheduleExecutorsOnWorkers方法
源码位置:org.apache.spark.deploy.master.Master
/** * Schedule executors to be launched on the workers. * Returns an array containing number of cores assigned to each worker. * * There are two modes of launching executors. The first attempts to spread out an application's * executors on as many workers as possible, while the second does the opposite (i.e. launch them * on as few workers as possible). The former is usually better for data locality purposes and is * the default. * * The number of cores assigned to each executor is configurable. When this is explicitly set, * multiple executors from the same application may be launched on the same worker if the worker * has enough cores and memory. Otherwise, each executor grabs all the cores available on the * worker by default, in which case only one executor may be launched on each worker. * * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core * at a time). Consider the following example: cluster has 4 workers with 16 cores each. * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is * allocated at a time, 12 cores from each worker would be assigned to each executor. * Since 12 < 16, no executors would launch [SPARK-8881]. * * 这里有一个bug问题,集群中有4个worker,每一个worker是16核,我们分配3个executor,要求每一个executor分配16核,总共分配48核 * 实际情况下,我们每一个worker中分配了12核,12核小于16核,报错[SPARK-8881]. * * 为Application分配要在Worker上启动的Executor返回一个数组,包括每个worker分配到的core数 */ private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { // 每个Executor的core数 val coresPerExecutor = app.desc.coresPerExecutor // 每个Executor的最小core数为1 val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // 每个Worker分配一个Executor val oneExecutorPerWorker = coresPerExecutor.isEmpty // 每个Executor的内存 val memoryPerExecutor = app.desc.memoryPerExecutorMB val numUsable = usableWorkers.length // 给每个Worker的cores数 val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker // 给每个Worker上新的Executor数 val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) /** Return whether the specified worker can launch an executor for this app. */ // 判断指定的worker是否可以为这个app启动一个executor def canLaunchExecutor(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. // 如果每个worker允许多个executor,就能一直在启动新的的executor // 如果在这个worker上已经有executor,则给这个executor更多的core val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0 if (launchingNewExecutor) { val assignedMemory = assignedExecutors(pos) * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && underLimit } else { // We're adding cores to an existing executor, so no need // to check memory and executor limits keepScheduling && enoughCores } } // Keep launching executors until no more workers can accommodate any // more executors, or if we have reached this application's limits // 不断的启动executor,直到不再有Worker可以容纳任何Executor,或者达到了这个Application的要求 var freeWorkers = (0 until numUsable).filter(canLaunchExecutor) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true while (keepScheduling && canLaunchExecutor(pos)) { coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor // If we are launching one executor per worker, then every iteration assigns 1 core // to the executor. Otherwise, every iteration assigns cores to a new executor. // 如果我们在每个worker上启动一个executor,每次迭代为每个executor增加一个core // 否则,每次迭代都会为新的executor分配cores if (oneExecutorPerWorker) { assignedExecutors(pos) = 1 } else { assignedExecutors(pos) += 1 } // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep // scheduling executors on this worker until we use all of its resources. // Otherwise, just move on to the next worker. // 如果不使用Spreading out方法,我们会在这个worker上继续调度executor,直到使用它所有的资源 // 否则,就跳转到下一个worker if (spreadOutApps) { keepScheduling = false } } } freeWorkers = freeWorkers.filter(canLaunchExecutor) } assignedCores }
第六步:调用第四步的allocateWorkerResourceToExecutors方法
源码位置:org.apache.spark.deploy.master.Master
private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // If the number of cores per executor is specified, we divide the cores assigned // to this worker evenly among the executors with no remainder. // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { // 首先,在application内部缓存结构中,添加executor // 并且创建ExecutorDesc对象,其中封装了,给这个executor分配了多少个cpu core // 基于我们的机制,实际上,最后,executor的实际数量,以及executor所对应的cpu是不一致的 // 我们这里是根据总的机制来分配的,比如说要求启动3个executor,每一个executor3个cpu,9个worker, // 根据我们的算法来说的话,就是每一个worker启动一个executor,一个executor对应一个cpu core val exec = app.addExecutor(worker, coresToAssign) launchExecutor(worker, exec) // 将app的状态设置为running app.state = ApplicationState.RUNNING } }
相关文章推荐
- Spark源码分析之Master资源调度算法原理
- SPARK的MAster资源调度原理(源码)分析
- Master原理剖析与源码分析:资源调度机制源码分析(schedule(),两种资源调度算法)
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
- 深入理解Spark 2.1 Core (六):资源调度的原理与源码分析
- spark源码学习(二)---Master源码分析(3)-master对driver、executor的调度
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- spark调度系列----1. spark stanalone模式下Master对worker上各个executor资源的分配
- Spark源码分析之二:Job的调度模型与运行反馈
- Spark1.3从创建到提交:4)资源分配源码分析
- Spark源码分析之Master注册机制原理
- spark源码学习(十一):资源的调度Schedule
- spark源码分析之master状态改变篇
- Spark系列(七)Master中的资源调度
- 【原】Spark中Master源码分析(二)
- Spark源码分析(三)调度管理1
- spark学习-Master资源调度分配算法
- spark 调度模块详解及源码分析
- spark源码分析--Master和worker建立连接
- Spark1.6.3 Driver端 task调度源码分析