您的位置:首页 > 其它

spark 源码解析其一(资源的申请,任务调度)

2020-01-14 14:07 218 查看
一,资源的申请 以下来自于 Master
1,首先receiveAndReply 方法接收EndpointRef ask及其衍生方法发送的信息(模式匹配RequestSubmitDriver)
1),创建 driver 信息
2),driver 持久化 , 存储 driver 运行记录, 以便于恢复信息
3), 调用 schedule() 检查并配置 executor信息, 提交 job
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 创建 driver 信息
val driver = createDriver(description)
//        driver 持久化 , 存储 driver 运行记录, 以便于恢复信息
persistenceEngine.addDriver(driver)

waitingDrivers += driver
drivers.add(driver)
//        调度 driver
schedule()

// TODO: It might be good to instead have the submission client poll the master to determine
//       the current status of the driver. For now it's simply "fire and forget".
//        提交 Driver 资源
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
View Code
  2,schedule 配置文件获取信息
  1),打散集群 所有的 worker
  2),配置文件读取 driver memory 与 core
  3),创建 executor
private def schedule(): Unit = {
//    判断 driver 是不是  Alive
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
//    打散集群 所有的 worker
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
//         判读 driver memory 与 core
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//           启动 driver work: driver 所要运行的 位置
//          driver : 当前 driver 的信息, SparkContext(各种任务调度器)
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
//    给 job 分配资源, 对于 job 来说 ,资源指的是 Executor
startExecutorsOnWorkers()
}
View Code
  3,startExecutorsOnWorkers
  1),遍历集群之中所有的满足 app 要求的 core 和 memory 的 work 信息,并返回给Schedule(),在此返回给 RequestSubmitDriver 提交job
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
//    waitingApps 代表的是 所有 master 注册的 作业(Executors)
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
//      app.coresLeft = --toal-excutor-cores - 分配的 core
//      coresPerExecutor = --executor-cores(每个 excutor 使用的 core)
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
//       work 指的是 集群所有的 worker 信息
//        遍历集群之中所有的满足 app 要求的 core 和 memory 的 work 信息,剩余的 core 多到少之中排序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
//    每个 worker 上可以为 当前 application 提供的 core 数
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
View Code
二,任务调度源码解析(DAGScheduler 类), 主要是(getMissingParentStages()与submitStage()两个方法):
1,首先从 foreach() 开始, 根据最后一个 RDD 创建 finalStage
2, 切割 RDD 划分 stage(getMissingParentStages())
1),如果failedStages的父stage是宽依赖,就存到 missing:HashSet
2),如果failedStages的父stage是窄依赖, 那么就会压栈,然后一直遍历栈,直至划分完最后的 RDD
3),getMissingParentStages() 最终 返回划分好的 stage 也就是 missing.toList
3,submitMissingTasks() 之中, 根据 missing(每stage划分好的job) 递归提交所有的job
// 切割 RDD 划分 stage
//  递归方法 递归提交父 stage,从 finalStage 开始
// finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
//                窄依赖
case shufDep: ShuffleDependency[_, _, _] =>
//                创建 stage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
//                  // 将 stage 放置  missing 之中
missing += mapStage
}
//                宽依赖
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
//    // 最后返回划分好的 stage
missing.toList
}

/** Submits stage, but first recursively submits any missing parents. */
//  递归方法 递归提交父 stage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//        寻找 丢失的父 stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//          递归完成所有的
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
//            递归
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
View Code

 

转载于:https://www.cnblogs.com/shi-qi/articles/11254559.html

  • 点赞
  • 收藏
  • 分享
  • 文章举报
GYY22897 发布了0 篇原创文章 · 获赞 0 · 访问量 217 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: