您的位置:首页 > 其它

Standalone模式下Spark任务资源分配

2017-06-26 09:20 281 查看

Standalone模式下Spark任务资源分配

1.背景

    在spark集群采用spark-submit命令提交任务时,我们可以通过配置资源相关参数来控制任务使用的资源,比如总核数,每个executor中核数与内存等,但是这个任务在spark集群中实际占用资源是没法配置的,需要根据实际环境和参数结合起来考虑,下面是就是对Standalone模式下Spark任务在集群中实际资源分配的分析

    需要说明的是,这里指只针对cpu、executor进行分析,不分析memory的分配,因为实际使用的总memory=executor个数*每个executor中memory,其中第二个参数是指定的,所以只需分析executor即可

2.源码分析

    在这里以spark2.0.2为例进行分析

    任务提交后,driver会向master注册,master收到消息后,会对维护的worker列表进行轮询,并在符合一定条件的worker上申请资源给executor,相关代码如下:

    Master类中,Master获取driver注册application的消息后,会在Mater注册application,然后开始为application分配资源

     case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}
   进入到schedule,在startExecutorsOnWorkers中开始在Worker上分配Executor

     /**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1

4000
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

    遍历所有等待中的application,依次对这些application分配资源。首先筛选出含有至少一个executor资源的worker,然后决定在每个worker上分配多少个executor、核数,主要在scheduleExecutorsOnWorkers函数中,在这个函数中只是决定在哪个worker上分配多少资源,但还没有实际分配,真正分配的动作是在allocateWorkerResourceToExecutors中进行

     /**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
    其中,spreadOutApps参数是指是否尽量将资源分配到更多的worker上,该值默认为true,即尽量分配到更多worker上

// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

    在scheduleExecutorsOnWorkers函数中,会依次检查每个worker是否能够加载一个executor,主要是在canLaunchExecutor函数中实现,下面会讲解这个函数。然后只要还存在可用的worker,就会在while循环中不停对worker组进行检查,如果某个worker可以加载一个executor的话,会从待分配的核数中减去一个executor中包含的核数,并且在对应的已分配核数的数组中增加一个executor中包含的核数,这个数据与可用worker数据是对应的。另外如果每个worker中允许存在多个executor的话,则该worker每分配一次资源,就会增加一个executor,否则只能存在一个executor,另外如果spreadOutApps值为true的话,则在一个worker上分配完一次资源后,就去下一个worker上分配资源,否则会一直在这个worker分配资源,直到不满足一个executor所需资源为止

    如果待分配的核数小于一个executor所需要包含的核数的话,循环停止,多余的分配不到executor的核数将被废弃掉,然后返回每个worker上分配的核数的数组

     private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
...................
}

// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}

    canLaunchExecutor函数用于判断一个worker是否可以加载一个executor。如果一个worker上不能分配多个executor的话,只要待分配核数大于等于每个executor所包含核数且剩余核数足够分配一个executor的话,就返回true,否则返回false;如果一个worker上可以分配多个executor的话,需要额外判断该worker上剩余memory是否足够分配一个executor以及分配的executor总数是否低于限制值,如果满足的话,返回true,否则返回false

     /** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores
}
}

3.测试过程

    Spark1.x与Spark2.x版本针对资源分配的策略不一样,因此分别对Spark1.4.1和Spark2.0.2两个版本进行了测试,spark集群中存在三个Worker节点,分别针对默认配置、配置了executor-cores参数、配置了spark.deploy.spreadOut参数三种场景进行测试

3.1 默认配置(不修改任何参数配置)

任务提交使用核数
Spark1.4.1
Spark2.0.2
1
实际使用核数1、executor数1

同Spark1.4.1
2
实际使用核数2、executor数2

同Spark1.4.1
3
实际使用核数3、executor数3

同Spark1.4.1
4
实际使用核数4、executor数3

同Spark1.4.1
5
实际使用核数5、executor数3

同Spark1.4.1
    在不配置任何参数的情况下,Spark1.4.1中,资源分配遵循两个原则:

    (1) 核数尽可能分散到各个worker节点,以轮流给每个worker分配一个核的方式进分配,如果一轮没有分配完,则进行下一轮分配

    (2) 每个worker上只存在一个executor,所有分配在该worker上的核数均存在分配给这个executor

    在不配置任何参数的情况下,Spark2.0.2的资源分配策略与Spark1.4.1一样

3.2 executor.cores参数的影响

executor.cores设置为2,该参数表示设置每个executor中存放2个核

任务提交使用核数
Spark1.4.1
Spark2.0.2
1
实际使用核数0、executor数0

   实际使用核数0、executor数0

2
实际使用核数0、executor数0

实际使用核数2、executor数1

3
实际使用核数0、executor数0

实际使用核数2、executor数1

4
实际使用核数2、executor数1

实际使用核数4、executor数2

5
实际使用核数4、executor数2

实际使用核数4、executor数2

    在设置了executor.cores参数为2的情况下,可以发现不同Spark版本之间的区别

    在Spark1.4.1中,依旧是采用轮流给每个worker分配核的方式,如果某个worker中核数达到设置的executor.cores参数的值,则分配到一个executor中,然后继续此过程,最后没有被分配到executor中的核被舍弃

在Spark2.0.2中,会尽可能先凑足一个executor所需要的核数分配到一个executor

(参照提交核数为2、4的情景),然后将此executor在集群中的节点上轮流进行分配,最后没能分配到executor的核将会被舍弃

    在设置了executor.cores参数的场景下,一个worker中是可能存在多个executor的

3.3 spark.deploy.spreadOut参数的影响

spark.deploy.spreadOut设置为true,则采用SpreadOut策略,设置为false,则采用非SpreadOut分配策略。该值默认为true

1.  SpreadOut分配策略:

SpreadOut分配策略是一种以round-robin方式遍历集群所有可用Worker,分配Worker资源,来启动创建Executor的策略,好处是尽可能的将cores分配到各个节点,最大化负载均衡和高并行。

2.  非SpreadOut分配策略:

非SpreadOut策略,该策略:会尽可能的根据每个Worker的剩余资源来启动Executor,这样启动的Executor可能只在集群的一小部分机器的Worker上。这样做对node较少的集群还可以,集群规模大了,Executor的并行度和机器负载均衡就不能够保证了。

 

以上是网上资料,但是实际测试结果发现spark.deploy.spreadOut无论设置为true或者false,测试结果均一样

4.总结

(1) 在默认情况下(不做任何参数设置),Spark1.4.1和Spark2.0.2中,核数会轮流分配到各worker节点,并且分配每个worker节点上的核数均会分配到同一个executor。任务在各worker节点至多分配一个executor。

(2) 在配置了executor.cores的情况下,Spark1.4.1中依旧采用轮流分配到到各worker节点,如果worker节点上分配的核数达到executor.cores值,则会将这些核分配到一个executor中,然后继续此过程,最后没有被分配到executor中的核被舍弃;Spark2.0.2中,会尽可能先凑足一个executor所需要的核数分配到一个executor,然后将此executor在集群中的节点上轮流进行分配。任务在各worker节点可能分配多个executor。

(3) spark.deploy.spreadOut参数暂未发现其作用,也可能是我测试的场景不对。这个参数只影响executor在worker上的分布,并不影响实际使用的总资源数目
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐