您的位置:首页 > 其它

spark调度系列----1. spark stanalone模式下Master对worker上各个executor资源的分配

2015-09-14 18:04 351 查看
spark Application的注册时在Master类中对RegisterApplication消息的处理。
Master类里面对

RegisterApplication
消息处理时, 首先将Application信息注册到Master,然后调用了Master.schedule方法
相关代码如下:

case RegisterApplication(description, driver) => {

// TODO Prevent repeated registrations from some driver


if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else {

logInfo("Registering app " + description.name)

val app = createApplication(description, driver)

registerApplication(app)//注册application

logInfo("Registered app " + description.name + " with ID " + app.id)

persistenceEngine.addApplication(app)

driver.send(RegisteredApplication(app.id, self))

schedule()//启动driver和executor

}

}

private def schedule(): Unit = {

if (state != RecoveryState.ALIVE) { return }

val conf = new Configuration()

val fs = FileSystem.get(URI.create("hdfs://ddos12:9000/log/workshuff.log"), conf)

val hdfsInStream = fs.open(new Path("hdfs://ddos12:9000/log/workshuff.log"))

// Drivers take strict precedence over executors

val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers

for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {

for (driver <- waitingDrivers) {

if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

launchDriver(worker, driver)//先启动Driver

waitingDrivers -= driver

}

}

}

startExecutorsOnWorkers()//选择executor在那些节点上运行,然后分配资源,启动executor

}


private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    if (spreadOutApps) {
      // Try to spread out each app among all the workers, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) {
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)//活着的节点
          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && //节点的剩余内存大于executor内存
            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))//节点的空闲core个数大于一个executor需要的core个数,如果没有设置spark.executor.cores,则默认1个Executor有1个core
          .sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        while (toAssign > 0) {
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {//这个worker还有空闲core,则为app在这个worker分配一个core,轮询尽可能多的worker
            toAssign -= 1
            assigned(pos) += 1
          }
          pos = (pos + 1) % numUsable//选择下一个worker节点
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        for (pos <- 0 until numUsable if assigned(pos) > 0) {
          allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))//在选定的worker上分配executor,一个worker可能分配多个executor
        }
      }
    } else {
      // Pack each app into as few workers as possible until we've assigned all its cores
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
        }
      }
    }
  }
在这里需要补充一下,当启动Spark Application的时候如果没有设置spark.executor.cores,那么在SparkDeploySchedulerBackend中创建ApplicationDescription时,coresPerExecutor的值为Option(None),这个时候Executor的cores个数回被设置成1,也就是默认情况下Executor的cores个数是1

private def allocateWorkerResourceToExecutors(

app: ApplicationInfo,

coresToAllocate: Int,

worker: WorkerInfo): Unit = {

val memoryPerExecutor = app.desc.memoryPerExecutorMB

val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)//如果没有明确指定一个executor core的个数,则把这个worker上分配的所有core分配给一个executor

var coresLeft = coresToAllocate

while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {//每次为一个executor分配的core个数至少为明确指定的core个数

val exec = app.addExecutor(worker, coresPerExecutor)

coresLeft -= coresPerExecutor

launchExecutor(worker, exec)

app.state = ApplicationState.RUNNING

}

}


结论1:spark standalone模式当设置的单个executor core个数和application总的core的个数不合理的时候,可能导致executor无法分配。一个worker上可能存在某个application的多个executor。
情况1:有3个节点,每个节点30个cores,
命令:spark-submit --executor-cores 2 --total-executor-cores 12
在Master.startExecutorsOnWorkers 分配资源的时候,一个worker分配了4个cores,在allocateWorkerResourceToExecutors 为每个executor分配资源的时候,一次分配2个cores,因此启动了2个executor

情况2: 30个节点,每个节点30个cores
命令: spark-submit --executor-cores 5 --total-executor-cores 30
在Master.startExecutorsOnWorkers 分配资源的时候,一个worker分配了1个cores,在allocateWorkerResourceToExecutors 为每个executor分配资源的时候,一次分配最少5个cores,少于这个节点上可用的core的个数,所以导致不能分配executor


结论2:从startExecutorsOnWorkers 方法可以看出,在分配core的时候,每次一个position只分配一个core,通过这种方式让尽可能多的worker上运行executor
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: