spark core 2.0 CoarseGrainedSchedulerBackend SchedulerBackend ExecutorAllocationClient 源代码解析
2017-01-09 16:05
369 查看
CoarseGrainedSchedulerBackend等待一个粗粒度的执行器来连接。这个backend在整个spark job期间保持每一个执行器。而不是当任务运行完之后就丢弃执行器,等运行新任务的时候再申请新的执行器。 执行器可以以不同的方式启动,如粗粒度的Mesos模式下的Mesos任务,和Spark独立模式下的独立进程。
和集群管理器进行通讯,请求或者杀死执行器的客户端。当前只能在YARN模式下支持。
requestTotalExecutors方法在集群管理器上更新调度的需求。包含三种信息用来做决定。
第一,numExecutors, 我们想要的总的执行器个数。 集群管理器不能杀死任何运行的执行器来达到这个数量,但是,如果所有的执行器都死亡的话,这是我们想被分配的执行器数量。
第二,localityAwareTasks. 所有的活动的阶段中,有本地化优先的任务的数量。包含正在运行的,等待的和完成的任务。
第三,hostToLocalTaskCount,一个主机到该主机运行的任务数量。包含正在运行的,等待的和完成的任务。
/** * A scheduler backend that waits for coarse-grained executors to connect. * This backend holds onto each executor for the duration of the Spark job rather than relinquishing * executors whenever a task is done and asking the scheduler to launch a new executor for * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode * (spark.deploy.*). */ private[spark] class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) extends ExecutorAllocationClient with SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed protected val totalCoreCount = new AtomicInteger(0) // Total number of executors that are currently registered protected val totalRegisteredExecutors = new AtomicInteger(0) protected val conf = scheduler.sc.conf private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. private val _minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeMs = conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") private val createTime = System.currentTimeMillis() // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply` // must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should // only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by // `CoarseGrainedSchedulerBackend.this`. private val executorDataMap = new HashMap[String, ExecutorData] // Number of executors requested from the cluster manager that have not registered yet @GuardedBy("CoarseGrainedSchedulerBackend.this") private var numPendingExecutors = 0 private val listenerBus = scheduler.sc.listenerBus // Executors we have requested the cluster manager to kill that have not died yet; maps // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't // be considered an app-related failure). @GuardedBy("CoarseGrainedSchedulerBackend.this") private val executorsPendingToRemove = new HashMap[String, Boolean] // A map to store hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var hostToLocalTaskCount: Map[String, Int] = Map.empty // The number of pending tasks which is locality required @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] // If this DriverEndpoint is changed to support multiple threads, // then this may need to be changed so that we don't share the serializer // instance across threads private val ser = SparkEnv.get.closureSerializer.newInstance() protected val addressToExecutorId = new HashMap[RpcAddress, String] private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") override def onStart() { // Periodically revive offers to allow delay scheduling to work val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReviveOffers)) } }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) } override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } } case ReviveOffers => makeOffers() case KillTask(taskId, executorId, interruptThread) => executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread)) case None => // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. val executorAddress = if (executorRef.address != null) { executorRef.address } else { context.senderAddress } logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId") addressToExecutorId(executorAddress) = executorId totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val data = new ExecutorData(executorRef, executorRef.address, hostname, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) if (currentExecutorIdCounter < executorId.toInt) { currentExecutorIdCounter = executorId.toInt } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() } case StopDriver => context.reply(true) stop() case StopExecutors => logInfo("Asking each executor to shut down") for ((_, executorData) <- executorDataMap) { executorData.executorEndpoint.send(StopExecutor) } context.reply(true) case RemoveExecutor(executorId, reason) => // We will remove the executor's state and cannot restore it. However, the connection // between the driver and the executor may be still alive so that the executor won't exit // automatically, so try to tell the executor to stop itself. See SPARK-13519. executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) context.reply(true) case RetrieveSparkProps => context.reply(sparkProperties) } // Make fake resource offers on all executors private def makeOffers() { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { addressToExecutorId .get(remoteAddress) .foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " + "containers exceeding thresholds, or network issues. Check driver logs for WARN " + "messages."))) } // Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Filter out executors under killing if (executorIsAlive(executorId)) { val executorData = executorDataMap(executorId) val workOffers = Seq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers)) } } private def executorIsAlive(executorId: String): Boolean = synchronized { !executorsPendingToRemove.contains(executorId) && !executorsPendingLossReason.contains(executorId) } // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } } // Remove a disconnected slave from the cluster private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") executorDataMap.get(executorId) match { case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors val killed = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) case None => // SPARK-15262: If an executor is still alive even after the scheduler has removed // its metadata, we may receive a heartbeat from that executor and tell its block // manager to reregister itself. If that happens, the block manager master will know // about the executor, but the scheduler will not. Therefore, we should remove the // executor from the block manager when we hit this case. scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) logInfo(s"Asked to remove non-existent executor $executorId") } } /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. * * @return Whether executor should be disabled */ protected def disableExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { if (executorIsAlive(executorId)) { executorsPendingLossReason += executorId true } else { // Returns true for explicitly killed executors, we also need to get pending loss reasons; // For others return false. executorsPendingToRemove.contains(executorId) } } if (shouldDisable) { logInfo(s"Disabling executor $executorId.") scheduler.executorLost(executorId, LossReasonPending) } shouldDisable } override def onStop() { reviveThread.shutdownNow() } } var driverEndpoint: RpcEndpointRef = null protected def minRegisteredRatio: Double = _minRegisteredRatio override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { properties += ((key, value)) } } // TODO (prashant) send conf instead of properties driverEndpoint = createDriverEndpointRef(properties) } protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) } protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { new DriverEndpoint(rpcEnv, properties) } def stopExecutors() { try { if (driverEndpoint != null) { logInfo("Shutting down all executors") driverEndpoint.askWithRetry[Boolean](StopExecutors) } } catch { case e: Exception => throw new SparkException("Error asking standalone scheduler to shut down executors", e) } } override def stop() { stopExecutors() try { if (driverEndpoint != null) { driverEndpoint.askWithRetry[Boolean](StopDriver) } } catch { case e: Exception => throw new SparkException("Error stopping standalone scheduler's driver endpoint", e) } } /** * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only * be called in the yarn-client mode when AM re-registers after a failure. * */ protected def reset(): Unit = synchronized { numPendingExecutors = 0 executorsPendingToRemove.clear() // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } override def reviveOffers() { driverEndpoint.send(ReviveOffers) } override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { driverEndpoint.send(KillTask(taskId, executorId, interruptThread)) } override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } /** * Called by subclasses when notified of a lost worker. It just fires the message and returns * at once. */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { // Only log the failure since we don't care about the result. driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { case t => logError(t.getMessage, t) }(ThreadUtils.sameThread) } def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { if (sufficientResourcesRegistered) { logInfo("SchedulerBackend is ready for scheduling beginning after " + s"reached minRegisteredResourcesRatio: $minRegisteredRatio") return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) { logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)") return true } false } /** * Return the number of executors currently registered with this backend. */ private def numExistingExecutors: Int = executorDataMap.size override def getExecutorIds(): Seq[String] = { executorDataMap.keySet.toSeq } /** * Request an additional number of executors from the cluster manager. * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = { if (numAdditionalExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of additional executor(s) " + s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") } logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") val response = synchronized { numPendingExecutors += numAdditionalExecutors logDebug(s"Number of pending executors is now $numPendingExecutors") // Account for executors pending to be added or removed doRequestTotalExecutors( numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) } defaultAskTimeout.awaitResult(response) } /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. * @param numExecutors The total number of executors we'd like to have. The cluster manager * shouldn't kill any running executor to reach this number, but, * if all existing executors were to die, this is the number of executors * we'd want to be allocated. * @param localityAwareTasks The number of tasks in all active stages that have a locality * preferences. This includes running, pending, and completed tasks. * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages * that would like to like to run on that host. * This includes running, pending, and completed tasks. * @return whether the request is acknowledged by the cluster manager. */ final override def requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int] ): Boolean = { if (numExecutors < 0) { throw new IllegalArgumentException( "Attempted to request a negative number of executor(s) " + s"$numExecutors from the cluster manager. Please specify a positive number!") } val response = synchronized { this.localityAwareTasks = localityAwareTasks this.hostToLocalTaskCount = hostToLocalTaskCount numPendingExecutors = math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0) doRequestTotalExecutors(numExecutors) } defaultAskTimeout.awaitResult(response) } /** * Request executors from the cluster manager by specifying the total number desired, * including existing pending and running executors. * * The semantics here guarantee that we do not over-allocate executors for this application, * since a later request overrides the value of any prior request. The alternative interface * of requesting a delta of executors risks double counting new executors when there are * insufficient resources to satisfy the first request. We make the assumption here that the * cluster manager will eventually fulfill all requests when resources free up. * * @return a future whose evaluation indicates whether the request is acknowledged. */ protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful(false) /** * Request that the cluster manager kill the specified executors. * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ final override def killExecutors(executorIds: Seq[String]): Boolean = { killExecutors(executorIds, replace = false, force = false) } /** * Request that the cluster manager kill the specified executors. * * When asking the executor to be replaced, the executor loss is considered a failure, and * killed tasks that are running on the executor will count towards the failure limits. If no * replacement is being requested, then the tasks will not count towards the limit. * * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones * @param force whether to force kill busy executors * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ final def killExecutors( executorIds: Seq[String], replace: Boolean, force: Boolean): Boolean = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") val response = synchronized { val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) unknownExecutors.foreach { id => logWarning(s"Executor to kill $id does not exist!") } // If an executor is already pending to be removed, do not kill it again (SPARK-9795) // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) val executorsToKill = knownExecutors .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } // If we do not wish to replace the executors we kill, sync the target number of executors // with the cluster manager to avoid allocating new ones. When computing the new target, // take into account executors that are pending to be added or removed. val adjustTotalExecutors = if (!replace) { doRequestTotalExecutors( numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) } else { numPendingExecutors += knownExecutors.size Future.successful(true) } val killExecutors: Boolean => Future[Boolean] = if (!executorsToKill.isEmpty) { _ => doKillExecutors(executorsToKill) } else { _ => Future.successful(false) } adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) } defaultAskTimeout.awaitResult(response) } /** * Kill the given list of executors through the cluster manager. * @return whether the kill request is acknowledged. */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful(false) } private[spark] object CoarseGrainedSchedulerBackend { val ENDPOINT_NAME = "CoarseGrainedScheduler" }SchedulerBackend是一个调度系统用的backend接口,这个接口允许嵌入不同的TaskSchedulerImpl的子类。 我们假定一个Mesos类似的模型,当服务器可用并且可以在上面运行任务时,应用程序能得到资源offers。
/** * A backend interface for scheduling systems that allows plugging in different ones under * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { private val appId = "spark-application-" + System.currentTimeMillis def start(): Unit def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Int def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException def isReady(): Boolean = true /** * Get an application ID associated with the job. * * @return An application ID */ def applicationId(): String = appId /** * Get the attempt ID for this run, if the cluster manager supports multiple * attempts. Applications run in client mode will not have attempt IDs. * * @return The application attempt id, if available. */ def applicationAttemptId(): Option[String] = None /** * Get the URLs for the driver logs. These URLs are used to display the links in the UI * Executors tab for the driver. * @return Map containing the log names and their respective URLs */ def getDriverLogUrls: Option[Map[String, String]] = None }
和集群管理器进行通讯,请求或者杀死执行器的客户端。当前只能在YARN模式下支持。
requestTotalExecutors方法在集群管理器上更新调度的需求。包含三种信息用来做决定。
第一,numExecutors, 我们想要的总的执行器个数。 集群管理器不能杀死任何运行的执行器来达到这个数量,但是,如果所有的执行器都死亡的话,这是我们想被分配的执行器数量。
第二,localityAwareTasks. 所有的活动的阶段中,有本地化优先的任务的数量。包含正在运行的,等待的和完成的任务。
第三,hostToLocalTaskCount,一个主机到该主机运行的任务数量。包含正在运行的,等待的和完成的任务。
/** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. */ private[spark] trait ExecutorAllocationClient { /** Get the list of currently active executors */ private[spark] def getExecutorIds(): Seq[String] /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. * @param numExecutors The total number of executors we'd like to have. The cluster manager * shouldn't kill any running executor to reach this number, but, * if all existing executors were to die, this is the number of executors * we'd want to be allocated. * @param localityAwareTasks The number of tasks in all active stages that have a locality * preferences. This includes running, pending, and completed tasks. * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages * that would like to like to run on that host. * This includes running, pending, and completed tasks. * @return whether the request is acknowledged by the cluster manager. */ private[spark] def requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]): Boolean /** * Request an additional number of executors from the cluster manager. * @return whether the request is acknowledged by the cluster manager. */ def requestExecutors(numAdditionalExecutors: Int): Boolean /** * Request that the cluster manager kill the specified executors. * @return whether the request is acknowledged by the cluster manager. */ def killExecutors(executorIds: Seq[String]): Boolean /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. */ def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) }
相关文章推荐
- 解析Spark Executor内幕,详解CoarseGrainedExecutorBackend
- 为什么说coarseGrainedExecutorBackend要通信的对象driverUrl是driverEndpoint而不是ClientEndpoint
- spark core 2.0 TaskSchedulerImpl 源代码解析
- Spark运行报错:ERROR CoarseGrainedExecutorBackend: Driver disassociated ! Shutting down
- Spark 任务调度之启动CoarseGrainedExecutorBackend
- spark学习-63-源代码:schedulerBackend和taskScheduler的创建(1)-local
- spark学习-64-源代码:schedulerBackend和taskScheduler的创建(2)-StandLone
- spark学习-65-源代码:schedulerBackend和taskScheduler的创建(3)-local-cluster
- spark学习-66-源代码:schedulerBackend和taskScheduler的创建(4)-yarn
- spark core 2.0 YarnClusterSchedulerBackend
- problem: ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state
- Spark Scheduler模块源码分析之TaskScheduler和SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- Spark Core Runtime分析: DAGScheduler, TaskScheduler, SchedulerBackend
- spark core 2.0 Executor
- 第36课: TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法
- Spark技术内幕:Client,Master和Worker 通信源代码解析
- [Spark内核] 第36课:TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等
- TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解
- 【源码学习之spark core 1.6.1 各种部署模式所使用的的TaskSceduler及SchedulerBackend】