您的位置:首页 > 其它

spark源码分析之master资源调度schedule篇

2018-02-10 22:54 483 查看
原理解释:

1、首先判断master的状态不是alive的话直接返回,也就是说,standby master是不会进行application等资源的调度的

2、对取出workers中所有注册上来上的worker,进行过滤,必须是状态为alive的worker,调用rondom的shuffle方法进行随机的打乱(从第三个worker进行shuffle)

3、遍历活着的worker,启动driver,将driver加入都内存缓冲结构中,并将driver从等待的driver的队列中移除

4、在workers上启动executors,使用是默认SpreadOutApps算法

源码解释:

第一步:调用schedule()方法

源码位置:org.apache.spark.deploy.master.Master

/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
/**
* 资源调度算法
*/
private def schedule(): Unit = {
// 首先判断master的状态不是alive的话直接返回
// 也就是说,standby master是不会进行application等资源的调度的
if (state != RecoveryState.ALIVE) { return }

// Drivers take strict precedence over executors
// random shuffle的原理是对传入的集合的元素进行随机的打乱
// 这里是对取出workers中所有注册上来上的worker,进行过滤,必须是状态为alive的worker
// 对状态为alive的worker,调用rondom的shuffle方法进行随机的打乱

// 意思就是从ArrayBuffer的最后一个元素开始到第三个元素,对于每个元素,都会取出该范围内的随机数,
// 比如说buf.length为10,然后next会取0到10的一个随机数,然后就会把buf随机的一个位置和该数字进行交换
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers

//  遍历活着的worker
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
// 遍历等待的driver,只有yarn-cluster和standalone的cluster模式提交的时候,才会注册driver,其他方式都是在
// 本地启动driver,而不是来注册driver,更不可能让master来调度driver
for (driver <- waitingDrivers) {
// 判断当前的worker的空闲内存量大于等于driver需要的内存量和判断worker的空闲cpu数大于等于driver需要的cpu数量
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 启动driver
launchDriver(worker, driver)
// 将driver从等待的driver的队列中删除
waitingDrivers -= driver
}
}
}
// 在workers上启动和调度executor
startExecutorsOnWorkers()
}


第二步:调用第一步的shuffle方法

源码位置:scala.util.Random

/** Returns a new collection of the same type in a randomly chosen order.
*
*  @return         the shuffled collection
*/
def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
val buf = new ArrayBuffer[T] ++= xs

def swap(i1: Int, i2: Int) {
val tmp = buf(i1)
buf(i1) = buf(i2)
buf(i2) = tmp
}

for (n <- buf.length to 2 by -1) {
val k = nextInt(n)
swap(n - 1, k)
}

(bf(xs) ++= buf).result
}


第三步:调用第一步的launchDriver方法

源码位置:org.apache.spark.deploy.master.Master

/**
* 在某一个worker上启动driver
*/
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 将driver加入到worker内部的缓冲结构中
// 将worker中使用的内存和cpu的数量,都加上driver需要的内存和cpu的数量
worker.addDriver(driver)
// 将worker加入到driver的内存缓冲结构中
driver.worker = Some(worker)
// 调用worker的actor,给worker发送注册driver的信息
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}


第四步:调用第一步的startExecutorsOnWorkers方法

源码位置:org.apache.spark.deploy.master.Master

/**
* Schedule and launch executors on workers
*/
/**
* Application的调度机制,默认是SpreadOutApps算法,另外一种是非SpreadOutApps算法
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.

// 首先遍历waitingApps中ApplicationInfo,并且还需要判断程序中定义的使用cpu的数量-启动执行application上
// worker上的excutor所使用的的cpu的要大于0
for (app <- waitingApps if app.coresLeft > 0) {

val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor

// Filter out workers that don't have enough resources to launch an executor
// 从workers中,过滤出worker的状态为alive的,按照cpu的数量进行倒序排序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse

val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// SpreadOut算法,会将每一个application要启动的executor都平均分布到各个worker上去
// 比如说20个cpu core,分配到10个worker上,实际会循环两遍worker,每次循环,
// 给每个worker分配一个 core,最后每个worker分配了2个core
// 总体概括:平均分布

// 非SpreadOut算法,将每一个application,尽可能少的分配到worker上去
// 每个applicati
4000
on,都尽可能的分配到尽量少的worker上,比如说10个worker
// 每个worker10个cpu,application要分配20个core,那么其实,只会分配到2个worker上
// 每个worker都占满10个core,其余的app,就只能分配到下一个worker上
// 总体概括:尽可能资源大的分配

// Now that we've decided how many cores to allocate on each worker, let's allocate them
// 给每个worker分配完application要求的cpu core之后,遍历worker,只要判断之前给这个worker分配到了core
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
// 就在worker上启动executor
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}


第五步:调用第四步的scheduleExecutorsOnWorkers方法

源码位置:org.apache.spark.deploy.master.Master

/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
*
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*
* 这里有一个bug问题,集群中有4个worker,每一个worker是16核,我们分配3个executor,要求每一个executor分配16核,总共分配48核
* 实际情况下,我们每一个worker中分配了12核,12核小于16核,报错[SPARK-8881].
*
* 为Application分配要在Worker上启动的Executor返回一个数组,包括每个worker分配到的core数
*/
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
//  每个Executor的core数
val coresPerExecutor = app.desc.coresPerExecutor
//  每个Executor的最小core数为1
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
//  每个Worker分配一个Executor
val oneExecutorPerWorker = coresPerExecutor.isEmpty
//  每个Executor的内存
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
// 给每个Worker的cores数
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
// 给每个Worker上新的Executor数
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

/** Return whether the specified worker can launch an executor for this app. */
//  判断指定的worker是否可以为这个app启动一个executor
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
//  如果每个worker允许多个executor,就能一直在启动新的的executor
//  如果在这个worker上已经有executor,则给这个executor更多的core
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}

// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
// 不断的启动executor,直到不再有Worker可以容纳任何Executor,或者达到了这个Application的要求
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
//  如果我们在每个worker上启动一个executor,每次迭代为每个executor增加一个core
//  否则,每次迭代都会为新的executor分配cores
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
//  如果不使用Spreading out方法,我们会在这个worker上继续调度executor,直到使用它所有的资源
//  否则,就跳转到下一个worker
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}


第六步:调用第四步的allocateWorkerResourceToExecutors方法

源码位置:org.apache.spark.deploy.master.Master

private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)

for (i <- 1 to numExecutors) {
// 首先,在application内部缓存结构中,添加executor
// 并且创建ExecutorDesc对象,其中封装了,给这个executor分配了多少个cpu core
// 基于我们的机制,实际上,最后,executor的实际数量,以及executor所对应的cpu是不一致的
// 我们这里是根据总的机制来分配的,比如说要求启动3个executor,每一个executor3个cpu,9个worker,
// 根据我们的算法来说的话,就是每一个worker启动一个executor,一个executor对应一个cpu core
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
// 将app的状态设置为running
app.state = ApplicationState.RUNNING
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark