spark资源调度流程总结
2017-01-10 23:32
363 查看
初学spark在Standalone模式下的资源调度机制,发现学习源码是理解spark一切机制的根本。现在对相关spark2.1.0源码的学习做个梳理。
一
应用程序提交时Master中对Driver和Executor的启动控制和资源分配机制。
首先进入Master.scala中查看Master类,资源调度流程学习从receive方法的case
RegisterApplicaiton(作业提交时的注册处理)开始。
case
RegisterApplication(description,
driver) =>
// TODO Prevent repeated registrations
from some driver
if
(state
== RecoveryState.STANDBY)
{
//1. 只有处理Active状态下Master才处理作业提交的注册!
// ignore, don't send response
}
else
{
logInfo("Registering app "
+ description.name)
val
app = createApplication(description,
driver)
registerApplication(app)
logInfo("Registered app "
+ description.name +
" with ID "
+ app.id)
//2.persistenceEngine为持久化引擎!
persistenceEngine.addApplication(app)
//3.个人理解此时还未启动driver,这个发送不是往driver发送注册消息。如何理解,后续补充
driver.send(RegisteredApplication(app.id,
self))
//4. 资源调度的核心:scheduler()
schedule()
}
接下来继续学习schedue()的源码。首先看到的Master对Driver启动的控制(Master在此方法中决定在哪台Worker上启动Driver)。
**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*
!!!Scheduler调用时机:每次有新的应用程序加入或者集群资源变化!!!
*/
private def
schedule():
Unit
= {
//1.
当前Master的状态必须是Alive,才会进行资源调度(standby状态的Mast
25327
er不进行资源调度)
if
(state
!= RecoveryState.ALIVE)
{
return
}
// Drivers take strict precedence
over executors
val
shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
== WorkerState.ALIVE))
val
numWorkersAlive = shuffledAliveWorkers.size
var
curPos =
0
//3.waitingDrivers是类型为DriverInfo的数组(private
val waitingDrivers = new ArrayBuffer[DriverInfo]),每个DriverInfo的DriverDescription中有要启动Driver时对Worker的内存和cores等要求的内容。
for
(driver <-
waitingDrivers.toList)
{ // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var
launched =
false
var
numWorkersVisited =
0
//在符合资源要求的情况下采用随机打乱后的一个Worker来启动Driver(launchDriver(worker,driver))。找到一个即将等待队列中的条目减一,然后退出循环!!!
while
(numWorkersVisited < numWorkersAlive && !launched) {
val
worker = shuffledAliveWorkers(curPos)
numWorkersVisited +=
1
if
(worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker,
driver)
waitingDrivers
-= driver
launched =
true
}
curPos = (curPos +
1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
深入学习startExecutorOnWorkers()方法来学习Executor的启动过程。(Master在此方法中按剩余资源的顺序决定在哪台Worker上启动executor以及为该executor分配的资源)
/**
* Schedule and launch executors on workers
*
在Worker基础上调度和启动executor
*/
private def
startExecutorsOnWorkers():
Unit
= {
// Right now this is a very simple
FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
//
1.spark默认为应用程序启动executor的方式为FIFO。所有提交的应用程序放在调度的等待队列中//而且是先入先出的。只有满足了前面的应用程序所需资源的基础上才能为下一个应用程序分配资源!!
//2.
private[master] def coresLeft: Int =
requestedCores
-
coresGranted理解为:只有系统资源//cores还有剩余时才会在Worker上调度和启动Exectuor
for
(app <-
waitingApps
if
app.coresLeft
>
0) {
//3.
获取Executor需要的core资源
val
coresPerExecutor: Option[Int]
= app.desc.coresPerExecutor
// Filter out workers that
don't have enough resources to launch an executor
val
usableWorkers =
workers.toArray.filter(_.state
== WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse//4.sortBy(_.coresFree)理解为:按Worker的资源大小进行排序
//scheduleExecutorsOnWorkers决定了在哪台Worker上分配多少个
//executour以及为每个executor分配的core数!!!
val
assignedCores = scheduleExecutorsOnWorkers(app,
usableWorkers,
spreadOutApps)
// Now that we've decided how
many cores to allocate on each worker, let's allocate them
for
(pos <-
0
until usableWorkers.length
if
assignedCores(pos) >
0) {
allocateWorkerResourceToExecutors(
app,
assignedCores(pos),
coresPerExecutor,
usableWorkers(pos))
}
}
}
二 Worker中启动Driver和启动ExecutorBackend的流程
Worker中启动Driver和ExecutorBackends进程分别都是创建一个Thread来启动的。具体源码可通过Worker类的receive方法中的case
launchDriver和case launchExecutor来学习。
下图为Worker启动这两个进程的简要流程。
![](https://img-blog.csdn.net/20170110233320145?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvcjAyMjIx/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
三 Executor的启动机制
CoarseGrainedExecutorBackend作为一个消息通信体(具体实现了ThreadSafeRPCEndpoint和ExecutorBackend),可以发送消息给Driver并可以接收Driver中发送过来的消息,如启动Task。
private[spark]
class
CoarseGrainedExecutorBackend(
override
val rpcEnv: RpcEnv,
driverUrl:
String,
executorId:
String,
hostname:
String,
cores:
Int,
userClassPath:
Seq[URL],
env: SparkEnv)
extends
ThreadSafeRpcEndpoint
with
ExecutorBackend
with
Logging
onStart方法中注册ExecutorBackend实例,receive方法的case
RegisterExecutor中创建了Executor实例。
override def
receive: PartialFunction[Any,
Unit] = {
case
RegisteredExecutor =>
logInfo("Successfully registered
with driver")
try
{
//executor与CoraseGrainedExecutorBackend是一一对应的!!
executor
=
new
Executor(executorId,
hostname,
env,
userClassPath,
isLocal =
false)
}
catch
{
case
NonFatal(e)
=>
exitExecutor(1,
"Unable to create executor due to "
+ e.getMessage,
e)
}
Driver进程中有连个至关重要的Endpoint:ClientEndpoint:
主要负责向Master注册当前的程序。DriverEndpoint:
主要负责程序运行时的驱动器。ExecutorData为Executor数据封装。
/**
Grouping of data for an executor used by CoarseGrainedSchedulerBackend.*/
private[cluster]
class
ExecutorData(
val
executorEndpoint: RpcEndpointRef,
val
executorAddress: RpcAddress,
override
val executorHost:
String,
var
freeCores:
Int,
override
val totalCores:
Int,
override
val logUrlMap:
Map[String,
String]
)
extends
ExecutorInfo(executorHost,
totalCores,
logUrlMap)
private val
executorDataMap
=
new
HashMap[String,
ExecutorData]
CoarseGrainedSchedulerBackend的receiveAndReply方法中case
RegisterExecutor完成具体的注册过程!!!
case
RegisterExecutor(executorId,
executorRef,
hostname,
cores,
logUrls) =>
if
(executorDataMap.contains(executorId))
{
executorRef.send(RegisterExecutorFailed("Duplicate
executor ID: " + executorId))
context.reply(true)
}
else
{
// If the executor's rpc env
is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val
executorAddress =
if
(executorRef.address !=
null)
{
executorRef.address
}
else
{
context.senderAddress
}
//更新executorBackend的注册结果!!!
logInfo(s"Registered executor
$executorRef ($executorAddress)
with ID $executorId")
addressToExecutorId(executorAddress)
= executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val
data =
new
ExecutorData(executorRef,
executorRef.address,
hostname,
cores,
cores,
logUrls)
// This must be synchronized
because variables mutated
// in this block are read when requesting executors
//
此处synchronized
关键字理解:Driver会同时接收很多的Executor注册请求!
CoarseGrainedSchedulerBackend.this.synchronized
{
executorDataMap.put(executorId,
data)
if
(currentExecutorIdCounter
< executorId.toInt) {
currentExecutorIdCounter
= executorId.toInt
}
if
(numPendingExecutors
>
0) {
numPendingExecutors
-=
1
logDebug(s"Decremented
number of pending executors ($numPendingExecutors left)")
}
}
//回复消息给ExecutorBackend!
CoarseGrainedBackend进程的receive方法中接收//到case Register Executor消息后会创建一个Executor实例对象!!!
executorRef.send(RegisteredExecutor)
// Note: some tests expect
the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(),
executorId,
data))
makeOffers()
}
Executor实例化的时候会实例化一个线程池来准备Task的计算
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
*
@param
threadFactory
the factory to use when creating new threads
*
@return
the newly created thread pool
*
@throws
NullPointerException if threadFactory is null
*/
public static
ExecutorService
newCachedThreadPool(ThreadFactory
threadFactory) {
return new
ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new
SynchronousQueue<Runnable>(),
threadFactory);
}
Executor具体是如何工作的:
CoarseGrainedExecutorBackend的receive方法中case
LauchTask完成具体任务接收!
case
LaunchTask(data)
=>
if
(executor ==
null)
{
exitExecutor(1,
"Received LaunchTask command but executor was null")
}
else
{
val
taskDesc =
ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task
" + taskDesc.taskId)
// 将Task任务交给Executor去执行!!
executor.launchTask(this,
taskId = taskDesc.taskId,
attemptNumber = taskDesc.attemptNumber,
taskDesc.name,
taskDesc.serializedTask)
}
2. Executor接收任务后将Task封装成TaskRunner
def
launchTask(
context: ExecutorBackend,
taskId:
Long,
attemptNumber:
Int,
taskName:
String,
serializedTask: ByteBuffer):
Unit
= {
//Task封装成TaskRunner,TaskRunner是JAVA Runnable接口的实现
val
tr =
new
TaskRunner(context,
taskId = taskId,
attemptNumber = attemptNumber,
taskName,
serializedTask)
runningTasks.put(taskId,
tr)
// Task封装后交由线程池,线程池中的线程此时会调用run方法来执行具体的计算任务!!!
threadPool.execute(tr)
}
一
应用程序提交时Master中对Driver和Executor的启动控制和资源分配机制。
首先进入Master.scala中查看Master类,资源调度流程学习从receive方法的case
RegisterApplicaiton(作业提交时的注册处理)开始。
case
RegisterApplication(description,
driver) =>
// TODO Prevent repeated registrations
from some driver
if
(state
== RecoveryState.STANDBY)
{
//1. 只有处理Active状态下Master才处理作业提交的注册!
// ignore, don't send response
}
else
{
logInfo("Registering app "
+ description.name)
val
app = createApplication(description,
driver)
registerApplication(app)
logInfo("Registered app "
+ description.name +
" with ID "
+ app.id)
//2.persistenceEngine为持久化引擎!
persistenceEngine.addApplication(app)
//3.个人理解此时还未启动driver,这个发送不是往driver发送注册消息。如何理解,后续补充
driver.send(RegisteredApplication(app.id,
self))
//4. 资源调度的核心:scheduler()
schedule()
}
接下来继续学习schedue()的源码。首先看到的Master对Driver启动的控制(Master在此方法中决定在哪台Worker上启动Driver)。
**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*
!!!Scheduler调用时机:每次有新的应用程序加入或者集群资源变化!!!
*/
private def
schedule():
Unit
= {
//1.
当前Master的状态必须是Alive,才会进行资源调度(standby状态的Mast
25327
er不进行资源调度)
if
(state
!= RecoveryState.ALIVE)
{
return
}
// Drivers take strict precedence
over executors
val
shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
== WorkerState.ALIVE))
val
numWorkersAlive = shuffledAliveWorkers.size
var
curPos =
0
//3.waitingDrivers是类型为DriverInfo的数组(private
val waitingDrivers = new ArrayBuffer[DriverInfo]),每个DriverInfo的DriverDescription中有要启动Driver时对Worker的内存和cores等要求的内容。
for
(driver <-
waitingDrivers.toList)
{ // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var
launched =
false
var
numWorkersVisited =
0
//在符合资源要求的情况下采用随机打乱后的一个Worker来启动Driver(launchDriver(worker,driver))。找到一个即将等待队列中的条目减一,然后退出循环!!!
while
(numWorkersVisited < numWorkersAlive && !launched) {
val
worker = shuffledAliveWorkers(curPos)
numWorkersVisited +=
1
if
(worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker,
driver)
waitingDrivers
-= driver
launched =
true
}
curPos = (curPos +
1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
深入学习startExecutorOnWorkers()方法来学习Executor的启动过程。(Master在此方法中按剩余资源的顺序决定在哪台Worker上启动executor以及为该executor分配的资源)
/**
* Schedule and launch executors on workers
*
在Worker基础上调度和启动executor
*/
private def
startExecutorsOnWorkers():
Unit
= {
// Right now this is a very simple
FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
//
1.spark默认为应用程序启动executor的方式为FIFO。所有提交的应用程序放在调度的等待队列中//而且是先入先出的。只有满足了前面的应用程序所需资源的基础上才能为下一个应用程序分配资源!!
//2.
private[master] def coresLeft: Int =
requestedCores
-
coresGranted理解为:只有系统资源//cores还有剩余时才会在Worker上调度和启动Exectuor
for
(app <-
waitingApps
if
app.coresLeft
>
0) {
//3.
获取Executor需要的core资源
val
coresPerExecutor: Option[Int]
= app.desc.coresPerExecutor
// Filter out workers that
don't have enough resources to launch an executor
val
usableWorkers =
workers.toArray.filter(_.state
== WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse//4.sortBy(_.coresFree)理解为:按Worker的资源大小进行排序
//scheduleExecutorsOnWorkers决定了在哪台Worker上分配多少个
//executour以及为每个executor分配的core数!!!
val
assignedCores = scheduleExecutorsOnWorkers(app,
usableWorkers,
spreadOutApps)
// Now that we've decided how
many cores to allocate on each worker, let's allocate them
for
(pos <-
0
until usableWorkers.length
if
assignedCores(pos) >
0) {
allocateWorkerResourceToExecutors(
app,
assignedCores(pos),
coresPerExecutor,
usableWorkers(pos))
}
}
}
二 Worker中启动Driver和启动ExecutorBackend的流程
Worker中启动Driver和ExecutorBackends进程分别都是创建一个Thread来启动的。具体源码可通过Worker类的receive方法中的case
launchDriver和case launchExecutor来学习。
下图为Worker启动这两个进程的简要流程。
三 Executor的启动机制
CoarseGrainedExecutorBackend作为一个消息通信体(具体实现了ThreadSafeRPCEndpoint和ExecutorBackend),可以发送消息给Driver并可以接收Driver中发送过来的消息,如启动Task。
private[spark]
class
CoarseGrainedExecutorBackend(
override
val rpcEnv: RpcEnv,
driverUrl:
String,
executorId:
String,
hostname:
String,
cores:
Int,
userClassPath:
Seq[URL],
env: SparkEnv)
extends
ThreadSafeRpcEndpoint
with
ExecutorBackend
with
Logging
onStart方法中注册ExecutorBackend实例,receive方法的case
RegisterExecutor中创建了Executor实例。
override def
receive: PartialFunction[Any,
Unit] = {
case
RegisteredExecutor =>
logInfo("Successfully registered
with driver")
try
{
//executor与CoraseGrainedExecutorBackend是一一对应的!!
executor
=
new
Executor(executorId,
hostname,
env,
userClassPath,
isLocal =
false)
}
catch
{
case
NonFatal(e)
=>
exitExecutor(1,
"Unable to create executor due to "
+ e.getMessage,
e)
}
Driver进程中有连个至关重要的Endpoint:ClientEndpoint:
主要负责向Master注册当前的程序。DriverEndpoint:
主要负责程序运行时的驱动器。ExecutorData为Executor数据封装。
/**
Grouping of data for an executor used by CoarseGrainedSchedulerBackend.*/
private[cluster]
class
ExecutorData(
val
executorEndpoint: RpcEndpointRef,
val
executorAddress: RpcAddress,
override
val executorHost:
String,
var
freeCores:
Int,
override
val totalCores:
Int,
override
val logUrlMap:
Map[String,
String]
)
extends
ExecutorInfo(executorHost,
totalCores,
logUrlMap)
private val
executorDataMap
=
new
HashMap[String,
ExecutorData]
CoarseGrainedSchedulerBackend的receiveAndReply方法中case
RegisterExecutor完成具体的注册过程!!!
case
RegisterExecutor(executorId,
executorRef,
hostname,
cores,
logUrls) =>
if
(executorDataMap.contains(executorId))
{
executorRef.send(RegisterExecutorFailed("Duplicate
executor ID: " + executorId))
context.reply(true)
}
else
{
// If the executor's rpc env
is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val
executorAddress =
if
(executorRef.address !=
null)
{
executorRef.address
}
else
{
context.senderAddress
}
//更新executorBackend的注册结果!!!
logInfo(s"Registered executor
$executorRef ($executorAddress)
with ID $executorId")
addressToExecutorId(executorAddress)
= executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val
data =
new
ExecutorData(executorRef,
executorRef.address,
hostname,
cores,
cores,
logUrls)
// This must be synchronized
because variables mutated
// in this block are read when requesting executors
//
此处synchronized
关键字理解:Driver会同时接收很多的Executor注册请求!
CoarseGrainedSchedulerBackend.this.synchronized
{
executorDataMap.put(executorId,
data)
if
(currentExecutorIdCounter
< executorId.toInt) {
currentExecutorIdCounter
= executorId.toInt
}
if
(numPendingExecutors
>
0) {
numPendingExecutors
-=
1
logDebug(s"Decremented
number of pending executors ($numPendingExecutors left)")
}
}
//回复消息给ExecutorBackend!
CoarseGrainedBackend进程的receive方法中接收//到case Register Executor消息后会创建一个Executor实例对象!!!
executorRef.send(RegisteredExecutor)
// Note: some tests expect
the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(),
executorId,
data))
makeOffers()
}
Executor实例化的时候会实例化一个线程池来准备Task的计算
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
*
@param
threadFactory
the factory to use when creating new threads
*
@return
the newly created thread pool
*
@throws
NullPointerException if threadFactory is null
*/
public static
ExecutorService
newCachedThreadPool(ThreadFactory
threadFactory) {
return new
ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new
SynchronousQueue<Runnable>(),
threadFactory);
}
Executor具体是如何工作的:
CoarseGrainedExecutorBackend的receive方法中case
LauchTask完成具体任务接收!
case
LaunchTask(data)
=>
if
(executor ==
null)
{
exitExecutor(1,
"Received LaunchTask command but executor was null")
}
else
{
val
taskDesc =
ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task
" + taskDesc.taskId)
// 将Task任务交给Executor去执行!!
executor.launchTask(this,
taskId = taskDesc.taskId,
attemptNumber = taskDesc.attemptNumber,
taskDesc.name,
taskDesc.serializedTask)
}
2. Executor接收任务后将Task封装成TaskRunner
def
launchTask(
context: ExecutorBackend,
taskId:
Long,
attemptNumber:
Int,
taskName:
String,
serializedTask: ByteBuffer):
Unit
= {
//Task封装成TaskRunner,TaskRunner是JAVA Runnable接口的实现
val
tr =
new
TaskRunner(context,
taskId = taskId,
attemptNumber = attemptNumber,
taskName,
serializedTask)
runningTasks.put(taskId,
tr)
// Task封装后交由线程池,线程池中的线程此时会调用run方法来执行具体的计算任务!!!
threadPool.execute(tr)
}
相关文章推荐
- Spark资源调度机制流程
- Spark Core 资源调度与任务调度(standalone client 流程描述)
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- 第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Spark资源调度中的通信流程
- Spark资源调度机制流程
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Spark资源调度分配原理
- Spark Executor Driver资源调度小结【转】
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
- Spark学习笔记之-Spark on yarn(动态资源调度)
- Spark Executor Driver资源调度小结
- Spark资源调度
- Spark中的资源调度
- Spark Executor Driver资源调度小结
- Spark系列(七)Master中的资源调度
- Spark调度机制:2)集群资源注册
- Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Spark 资源调度及任务调度
- Spark Streaming 作业调度流程