您的位置:首页 > 其它

Spark技术内幕:Executor分配详解

2016-08-25 18:32 531 查看
当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程。序列图如下:




1. SparkContext创建TaskScheduler和DAG Scheduler

SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它。如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例。创建SparkContext的实例,主要的工作除了设置一些conf,比如executor使用到的memory的大小。如果系统的配置文件有,那么就读取该配置。否则则读取环境变量。如果都没有设置,那么取默认值为512M。当然了这个数值还是很保守的,特别是在内存已经那么昂贵的今天。

[java] view
plain copy

private[spark] val executorMemory = conf.getOption("spark.executor.memory")  

    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))  

    .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))  

    .map(Utils.memoryStringToMb)  

    .getOrElse(512)  

除了加载这些集群的参数,它完成了TaskScheduler和DAGScheduler的创建:

[java] view
plain copy

// Create and start t
4000
he scheduler  

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)  

private val heartbeatReceiver = env.actorSystem.actorOf(  

  Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")  

@volatile private[spark] var dagScheduler: DAGScheduler = _  

try {  

  dagScheduler = new DAGScheduler(this)  

} catch {  

  case e: Exception => throw  

    new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))  

}  

  

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's  

// constructor  

taskScheduler.start()  

TaskScheduler是通过不同的SchedulerBackend来调度和管理任务。它包含资源分配和任务调度。它实现了FIFO调度和FAIR调度,基于此来决定不同jobs之间的调度顺序。并且管理任务,包括任务的提交和终止,为饥饿任务启动备份任务。

不同的Cluster,包括local模式,都是通过不同的SchedulerBackend的实现其不同的功能。这个模块的类图如下:





2. TaskScheduler通过SchedulerBackend创建AppClient

SparkDeploySchedulerBackend是Standalone模式的SchedulerBackend。通过创建AppClient,可以向Standalone的Master注册Application,然后Master会通过Application的信息为它分配Worker,包括每个worker上使用CPU core的数目等。

[java] view
plain copy

private[spark] class SparkDeploySchedulerBackend(  

    scheduler: TaskSchedulerImpl,  

    sc: SparkContext,  

    masters: Array[String])  

  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)  

  with AppClientListener  

  with Logging {  

  

15165
  var client: AppClient = null  <span style="font-family: Arial, Helvetica, sans-serif;">//注:Application与Master的接口</span>  

  

  val maxCores = conf.getOption("spark.cores.max").map(_.toInt) <span style="font-family: Arial, Helvetica, sans-serif;">//注:获得每个executor最多的CPU core数目</span>  

  override def start() {  

    super.start()  

  

    // The endpoint for executors to talk to us  

    val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(  

      SparkEnv.driverActorSystemName,  

      conf.get("spark.driver.host"),  

      conf.get("spark.driver.port"),  

      CoarseGrainedSchedulerBackend.ACTOR_NAME)  

    //注:现在executor还没有申请,因此关于executor的所有信息都是未知的。  

    //这些参数将会在org.apache.spark.deploy.worker.ExecutorRunner启动ExecutorBackend的时候替换这些参数  

    val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")  

    //注:设置executor运行时需要的环境变量  

    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")  

      .map(Utils.splitCommandString).getOrElse(Seq.empty)  

    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>  

      cp.split(java.io.File.pathSeparator)  

    }  

    val libraryPathEntries =  

      sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>  

        cp.split(java.io.File.pathSeparator)  

      }  

  

    // Start executors with a few necessary configs for registering with the scheduler  

    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)  

    val javaOpts = sparkJavaOpts ++ extraJavaOpts  

    //注:在Worker上通过org.apache.spark.deploy.worker.ExecutorRunner启动  

    // org.apache.spark.executor.CoarseGrainedExecutorBackend,这里准备启动它需要的参数  

    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",  

      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)  

    //注:org.apache.spark.deploy.ApplicationDescription包含了所有注册这个Application的所有信息。  

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,  

      sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))  

  

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)  

    client.start()  

    //注:在Master返回注册Application成功的消息后,AppClient会回调本class的connected,完成了Application的注册。  

    waitForRegistration()  

  }  

org.apache.spark.deploy.client.AppClientListener是一个trait,主要为了SchedulerBackend和AppClient之间的函数回调,在以下四种情况下,AppClient会回调相关函数以通知SchedulerBackend:

向Master成功注册Application,即成功链接到集群;
断开连接,如果当前SparkDeploySchedulerBackend::stop == false,那么可能原来的Master实效了,待新的Master ready后,会重新恢复原来的连接;
Application由于不可恢复的错误停止了,这个时候需要重新提交出错的TaskSet;
添加一个Executor,在这里的实现仅仅是打印了log,并没有额外的逻辑;
删除一个Executor,可能有两个原因,一个是Executor退出了,这里可以得到Executor的退出码,或者由于Worker的退出导致了运行其上的Executor退出,这两种情况需要不同的逻辑来处理。

[java] view
plain copy

private[spark] trait AppClientListener {  

  def connected(appId: String): Unit  

  

  /** Disconnection may be a temporary state, as we fail over to a new Master. */  

  def disconnected(): Unit  

  

  /** An application death is an unrecoverable failure condition. */  

  def dead(reason: String): Unit  

  

  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)  

  

  def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit  

}  

小结:SparkDeploySchedulerBackend装备好启动Executor的必要参数后,创建AppClient,并通过一些回调函数来得到Executor和连接等信息;通过org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor与ExecutorBackend来进行通信。

3. AppClient向Master提交Application

AppClient是Application和Master交互的接口。它的包含一个类型为org.apache.spark.deploy.client.AppClient.ClientActor的成员变量actor。它负责了所有的与Master的交互。actor首先向Master注册Application。如果超过20s没有接收到注册成功的消息,那么会重新注册;如果重试超过3次仍未成功,那么本次提交就以失败结束了。

[java] view
plain copy

def tryRegisterAllMasters() {  

  for (masterUrl <- masterUrls) {  

    logInfo("Connecting to master " + masterUrl + "...")  

    val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))  

    actor ! RegisterApplication(appDescription) // 向Master注册  

  }  

}  

  

def registerWithMaster() {  

  tryRegisterAllMasters()  

  import context.dispatcher  

  var retries = 0  

  registrationRetryTimer = Some { // 如果注册20s内未收到成功的消息,那么再次重复注册  

    context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {  

      Utils.tryOrExit {  

        retries += 1  

        if (registered) { // 注册成功,那么取消所有的重试  

          registrationRetryTimer.foreach(_.cancel())  

        } else if (retries >= REGISTRATION_RETRIES) { // 重试超过指定次数(3次),则认为当前Cluster不可用,退出  

          markDead("All masters are unresponsive! Giving up.")  

        } else { // 进行新一轮的重试<span
style="margin: 0px; padding: 0px; border: non
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: