您的位置:首页 > 其它

Spark源码分析之Worker启动通信机制

2017-11-09 09:32 585 查看
Worker是spark的工作节点,主要负责接受Master指令,启动或者杀掉Executor,Driver等;汇报Driver或者Executor状态到Master;发送心跳请求到Master等等

一 重要属性

RpcEnv rpcEnv: 用于注册和维护RpcEndpoint和RpcEndpointRef

Int webUiPort: web ui 端口

Int cores: 分配给该worker的CPU核数

Int coresUsed: 该worker使用的CPU核数

Int coresFree = cores - coresUsed 还剩余的CPU核数

Int memory: 分配给该worker的内存容量

Int memoryUsed:该worker使用的内存容量

Int memoryFree = memory - memoryUsed  还剩余的内存容量

Array[RpcAddress] masterRpcAddresses: master RpcAddress数组

String endpointName: worker的rpc终端名字

String workDirPath: 工作目录

forwordMessageScheduler: 一个后台调度线程,在指定的时间发送消息

cleanupThreadExecutor: 一个后台清理工作目录的线程

Option[RpcEndpointRef] master: master终端

String activeMasterUrl: 当前有效的master url

String activeMasterWebUiUrl: 当前有效的master web ui url

String workerWebUiUrl: worker的web ui url

String workerUri: worker的url

boolean registered: 该worker是否已经注册

boolean connected: 该worker是否连接到master

String workerId: worker的id

HashMap[String, DriverRunner] drivers: worker维护的一个所有driver id -> DriverRunner的映射

HashMap[String, ExecutorRunner] executors:  worker维护的一个所有executor id -> ExecutorRunner的映射

LinkedHashMap[String, DriverRunner]finishedDrivers: worker维护的一个已经完成任务的driver id -> DriverRunner的映射

HashMap[String, Seq[String]] appDirectories: worker维护的一个application id -> app目录的映射

HashSet[String] finishedApps: 该worker已经完成工作的application

HEARTBEAT_MILLIS: 向Master发送心跳频率

INITIAL_REGISTRATION_RETRIES: 向master注册初始重试次数,默认是6次

TOTAL_REGISTRATION_RETRIES: 向master注册总的尝试次数

INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS: 初始化的注册重试间隔

PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS:延长的注册重试间隔

CLEANUP_ENABLED: 是否启用cleanup功能

CLEANUP_INTERVAL_MILLIS: cleanup时间间隔

APP_DATA_RETENTION_SECONDS: app数据保存时间长度

 

二 核心方法

2.1 main方法

def
main(argStrings:Array[String]) {

  Utils.initDaemon(log)

  val conf
= new SparkConf

  // 解析启动参数列表

  val args=
new WorkerArguments(argStrings,conf)

  // 启动Rpc通信环境和通信终端

  val rpcEnv=
startRpcEnvAndEndpoint(args.host,args.port,args.webUiPort,args.cores,

    args.memory,args.masters,args.workDir,conf
= conf)

  rpcEnv.awaitTermination()

}

2.2onstart 启动worker

# 创建工作目录

# 创建Web UI,并且绑定Web UI

# 向Master注册

override def onStart() {
  assert(!registered)
  logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
    host, port, cores, Utils.megabytesToString(memory)))
  logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
  logInfo("Spark home: " + sparkHome)
  // 创建工作目录
  createWorkDir()
  // 如果ExternalShuffleService 启用了,就调用它的start方法
  shuffleService.startIfEnabled()
  // 创建 worker的web ui
  webUi = new WorkerWebUI(this, workDir, webUiPort)
  webUi.bind()

  workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
  // 向Master注册
  registerWithMaster()

  metricsSystem.registerSource(workerSource)
  metricsSystem.start()
  // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
  metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

 

2.3createWorkDir 创建工作目录

/**
 * 创建worker的存放一些数据的目录
 * app-20170613113959-0000
 * app-20170613114457-0001
 * app-20170613114710-0002
 */
private def createWorkDir() {
  // 获取工作目录
  workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
  try {
    // 创建目录
    workDir.mkdirs()
    // 如果目录不存在或者不是目录,则退出
    if ( !workDir.exists() || !workDir.isDirectory) {
      logError("Failed to create work directory " + workDir)
      System.exit(1)
    }
    assert (workDir.isDirectory)
  } catch {
    case e: Exception =>
      logError("Failed to create work directory " + workDir, e)
      System.exit(1)
  }
}

 

2.4 registerWithMaster():向master注册

private def registerWithMaster() {
  registrationRetryTimer match {
    //如果没有,说明还没有注册,然后会开始去注册
    case None =>
      // 初始注册状态为false
      registered = false
      // 尝试向所有master注册
      registerMasterFutures = tryRegisterAllMasters()
      // 连接尝试次数设为0
      connectionAttemptCount = 0
      // 后台线程定时调度,发送ReregisterWithMaster请求,如果之前已经注册成功,则下一次来注册,则啥也不做
      registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
        new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            Option(self).foreach(_.send(ReregisterWithMaster))
          }
        },
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        TimeUnit.SECONDS))
    // 如果已经有 registrationRetryTimer,就啥都不做
    case Some(_) =>

  }
}

 

2.5tryRegisterAllMasters 尝试向所有的集群内所有master注册

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  masterRpcAddresses.map { masterAddress =>
    registerMasterThreadPool.submit(new Runnable {
      override def run(): Unit = {
        try {
          logInfo("Connecting to master " + masterAddress + "...")
          // 构造master RpcEndpoint,用于向master发送消息或者请求
          val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
          // 向指定的master注册
          registerWithMaster(masterEndpoint)
        } catch {
          case ie: InterruptedException => // Cancelled
          case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
        }
      }
    })
  }
}

2.6registerWithMaster(masterEndpoint: RpcEndpointRef)

向master注册

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
  // 向master发送RegisterWorker请求
  masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
    workerId, host, port, self, cores, memory, workerWebUiUrl))
    .onComplete {
      // 回调成功,则调用handleRegisterResponse
      case Success(msg) =>
        Utils.tryLogNonFatalError {
          handleRegisterResponse(msg)
        }
      // 回调失败,则退出
      case Failure(e) =>
        logError(s"Cannot register with master: ${masterEndpoint.address}", e)
        System.exit(1)
    }(ThreadUtils.sameThread)
}

 

2.7 handleRegisterResponse 处理回调函数的结果

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
  msg match {
    // 如果是RegisteredWorker请求,表示已经注册成功
    case RegisteredWorker(masterRef, masterWebUiUrl) =>
      logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
      registered = true // 更新registered状态
      changeMaster(masterRef, masterWebUiUrl)
      // 后台线程开始定时调度向master发送心跳的线程
      forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          self.send(SendHeartbeat)
        }
      }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
      // 如果启用了cleanup功能,后台线程开始定时调度发送WorkDirCleanup指令,清理目录
      if (CLEANUP_ENABLED) {
        logInfo(
          s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(WorkDirCleanup)
          }
        }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
      }
      // 根据worker所持有的executor构造ExecutorDescription对象,描述该executor
      val execs = executors.values.map { e =>
        new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
      }
      // 向master发送WorkerLatestState请求,获取worker最近状态
      masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
    // 如果是RegisterWorkerFailed请求,表示注册失败
    case RegisterWorkerFailed(message) =>
      // 如果还没有注册成功,则退出
      if (!registered) {
        logError("Worker registration failed: " + message)
        System.exit(1)
      }
    // 如果是MasterInStandby请求,则啥也不做
    case MasterInStandby =>
      // Ignore. Master not yet ready.
  }
}

 

2.8receive 接受消息,但是不需要返回结果

override def receive: PartialFunction[Any, Unit] = synchronized {
  // 如果接收的是SendHeartbeat消息,表示需要向master发送心跳请求
  case SendHeartbeat =>
    if (connected) { sendToMaster(Heartbeat(workerId, self)) }
  // 如果接收的是WorkDirCleanup消息,表示需要清理工作目录
  case WorkDirCleanup =>
    // 首先通过executors获取它所对应的app id的集合
    val appIds = executors.values.map(_.appId).toSet
    // 获取那些已经完毕的application目录,并且递归删除之,将处理结果封装在Future对象里
    val cleanupFuture = concurrent.Future {
      // 获取该目录下所有文件
      val appDirs = workDir.listFiles()
      if (appDirs == null) {
        throw new IOException("ERROR: Failed to list files in " + appDirs)
      }
      //
      appDirs.filter { dir =>
        val appIdFromDir = dir.getName // 获取目录名字
        val isAppStillRunning = appIds.contains(appIdFromDir) // 判断这个目录所在的application是否正在运行
        // 如果是目录,且不再包含任何新文件,则递归删除该目录
        dir.isDirectory && !isAppStillRunning &&
        !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
      }.foreach { dir =>
        logInfo(s"Removing directory: ${dir.getPath}")
        Utils.deleteRecursively(dir)
      }
    }(cleanupThreadExecutor)

    cleanupFuture.onFailure {
      case e: Throwable =>
        logError("App dir cleanup failed: " + e.getMessage, e)
    }(cleanupThreadExecutor)
  // 如果接收MasterChanged消息,表示master已经发生变化了
  case MasterChanged(masterRef, masterWebUiUrl) =>
    logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
    // 获取新的master的url和master,连接状态置为true,取消之前的尝试重新注册
    changeMaster(masterRef, masterWebUiUrl)
    // 创建当前节点executors的简单描述对象ExecutorDescription
    val execs = executors.values.
      map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
    // 向新的master发送WorkerSchedulerStateResponse消息,然后会做一些操作
    masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
  // 如果接收到ReconnectWorker消息,表示之前worker断开,需要重新连接
  case ReconnectWorker(masterUrl) =>
    logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
    // 断开之后,需要重新向master注册
    registerWithMaster()
  // 如果接收到LaunchExecutor消息,表示需要发起executor
  case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
    // 检测master是否有效
    if (masterUrl != activeMasterUrl) {
      logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
    } else {
      try {
        logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

        // 创建executor目录,appId/execId
        val executorDir = new File(workDir, appId + "/" + execId)
        if (!executorDir.mkdirs()) {
          throw new IOException("Failed to create directory " + executorDir)
        }

        // 获取application本地目录,如果没有则创建,最后这些目录在应用程序运行完毕之后删除掉
        val appLocalDirs = appDirectories.getOrElse(appId,
          Utils.getOrCreateLocalRootDirs(conf).map { dir =>
            val appDir = Utils.createDirectory(dir, namePrefix = "executor")
            Utils.chmod700(appDir)
            appDir.getAbsolutePath()
          }.toSeq)
        appDirectories(appId) = appLocalDirs
        // 创建ExecutorRunner对象,主要负责管理executor进程的执行
        val manager = new ExecutorRunner(
          appId,
          execId,
          appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
          cores_,
          memory_,
          self,
          workerId,
          host,
          webUi.boundPort,
          publicAddress,
          sparkHome,
          executorDir,
          workerUri,
          conf,
          appLocalDirs, ExecutorState.RUNNING)
        // worker维护的executor id->ExecutorRunner 映射添加这个新建的 ExecutorRunner
        executors(appId + "/" + execId) = manager
        // 启动这个ExecutorRunner
        manager.start()
        // 重新计算已经使用的cpu核数和内存容量
        coresUsed += cores_
        memoryUsed += memory_
        // 向master发送ExecutorStateChanged消息
        sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
      } catch {
        case e: Exception =>
          logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
          if (executors.contains(appId + "/" + execId)) {
            executors(appId + "/" + execId).kill()
            executors -= appId + "/" + execId
          }
          sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
            Some(e.toString), None))
      }
    }
  // 如果接收ExecutorStateChanged消息,表示executor状态发生改变
  case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
    handleExecutorStateChanged(executorStateChanged)
  // 如果接收到KillExecutor消息,表示需要杀掉这个executor进程
  case KillExecutor(masterUrl, appId, execId) =>
    if (masterUrl != activeMasterUrl) {
      logWarning("Invalid Master (" + masterUrl + ") attempted to kill executor " + execId)
    } else {
      val fullId = appId + "/" + execId
      executors.get(fullId) match {
        case Some(executor) =>
          logInfo("Asked to kill executor " + fullId)
          executor.kill()
        case None =>
          logInfo("Asked to kill unknown executor " + fullId)
      }
    }
  // 如果接收到LaunchDriver消息,表示启动Driver
  case LaunchDriver(driverId, driverDesc) =>
    logInfo(s"Asked to launch driver $driverId")
    // 创建DriverRunner,分配资源
    val driver = new DriverRunner(
      conf,
      driverId,
      workDir,
      sparkHome,
      driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
      self,
      workerUri,
      securityMgr)
    //加入到drivers
    drivers(driverId) = driver
    // 启动这个driver
    driver.start()
    // 重新计算当前worker使用的内存和cpu
    coresUsed += driverDesc.cores
    memoryUsed += driverDesc.mem
  // 如果接收到KillDriver消息,表示小杀掉这个driver
  case KillDriver(driverId) =>
    logInfo(s"Asked to kill driver $driverId")
    drivers.get(driverId) match {
      case Some(runner) =>
        runner.kill()
      case None =>
        logError(s"Asked to kill unknown driver $driverId")
    }
  // 如果接收到DriverStateChanged消息,表示driver状态改变
  case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
    handleDriverStateChanged(driverStateChanged)
  // 如果接收到ReregisterWithMaster消息,表示需要重新向master注册
  case ReregisterWithMaster =>
    reregisterWithMaster()
  // 如果接收到ApplicationFinished消息,表示application已经运行完毕
  case ApplicationFinished(id) =>
    finishedApps += id
    // 这时候可能需要清理application目录了
    maybeCleanupApplication(id)
}

 

2.9receiveAndReply 接收消息吗,返回结果

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // 如果接收到RequestWorkerState消息,则需要返回worker的状态信息
  case RequestWorkerState =>
    context.reply(WorkerStateResponse(host, port, workerId, executors.values.toList,
      finishedExecutors.values.toList, drivers.values.toList,
      finishedDrivers.values.toList, activeMasterUrl, cores, memory,
      coresUsed, memoryUsed, activeMasterWebUiUrl))
}

 

2.10 changeMaster

获取新的master的url和master,取消之前那些重新注册尝试,因为已经发现新的master

private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
  // 获取新的master的url和master
  activeMasterUrl = masterRef.address.toSparkURL
  activeMasterWebUiUrl = uiUrl
  master = Some(masterRef)
  connected = true // 连接状态置为true
  if (conf.getBoolean("spark.ui.reverseProxy", false)) {
    logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId")
  }
  // 取消之前那些重新注册尝试,因为已经发现新的master
  cancelLastRegistrationRetry()
}

 

2.11handleExecutorStateChanged 处理executor状态改变

private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
  Unit = {
  // 首先向master发送ExecutorStateChanged消息
  sendToMaster(executorStateChanged)
  // 获取Executor状态
  val state = executorStateChanged.state
  // 如果是完成状态
  if (ExecutorState.isFinished(state)) {
    // 获取该Executor对应的application id
    val appId = executorStateChanged.appId
    // appId/execId
    val fullId = appId + "/" + executorStateChanged.execId
    val message = executorStateChanged.message
    val exitStatus = executorStateChanged.exitStatus
    // 从worker的维护的executor id和ExecuteRunner中获取ExecuteRunner
    executors.get(fullId) match {
      case Some(executor) =>
        logInfo("Executor " + fullId + " finished with state " + state +
          message.map(" message " + _).getOrElse("") +
          exitStatus.map(" exitStatus " + _).getOrElse(""))
        // 首先将这个ExecuteRunner移除executors映射集合
        executors -= fullId
        // 将它移动到处于完成状态的映射集合finishedExecutors
        finishedExecutors(fullId) = executor
        // 如果需要,则删除一些完成的executors
        trimFinishedExecutorsIfNecessary()
        // 释放CPU和内存
        coresUsed -= executor.cores
        memoryUsed -= executor.memory
      case None =>
        logInfo("Unknown Executor " + fullId + " finished with state " + state +
          message.map(" message " + _).getOrElse("") +
          exitStatus.map(" exitStatus " + _).getOrElse(""))
    }
    // 这时候可能会清理application工作目录
    maybeCleanupApplication(appId)
  }
}

 

2.12handleDriverStateChanged 处理driver状态改变

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
  // 获取driver id
  val driverId = driverStateChanged.driverId
  val exception = driverStateChanged.exception
  // 获取driver的状态
  val state = driverStateChanged.state
  state match {
    case DriverState.ERROR =>
      logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
    case DriverState.FAILED =>
      logWarning(s"Driver $driverId exited with failure")
    case DriverState.FINISHED =>
      logInfo(s"Driver $driverId exited successfully")
    case DriverState.KILLED =>
      logInfo(s"Driver $driverId was killed by user")
    case _ =>
      logDebug(s"Driver $driverId changed state to $state")
  }
  // 向master发送DriverStateChanged消息
  sendToMaster(driverStateChanged)
  // 从drivers集合移除,并把它添加到处于完成状态的集合finishedDrivers
  val driver = drivers.remove(driverId).get
  finishedDrivers(driverId) = driver
  // 如果需要,则删除一些完成的executors
  trimFinishedDriversIfNecessary()
  // 释放CPU和内存
  memoryUsed -= driver.driverDesc.mem
  coresUsed -= driver.driverDesc.cores
}

 

2.13reregisterWithMaster 重新注册

有时候早遇到网络异常或者master失败,则需要重新向master注册,如果注册超过指定的次数,则worker退出

private def reregisterWithMaster(): Unit = {
  Utils.tryOrExit {
    // 初始化尝试连接次数加1
    connectionAttemptCount += 1
    // 如果之前已经注册成功的,则取消最近的重新尝试注册
    if (registered) {
      cancelLastRegistrationRetry()
    }
    // 如果尝试连接次数小于总的注册尝试次数,则进行重新注册,否则退出
    else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
      logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
      // 向有效的master重新注册,如果没有,这就意味着worker仍然处于引导状态,还没有和master建立连接
      // 在此种情况下,我们应该向所有的master重新注册
      master match {
        // 如果master存在,但是registered又是false,表示我们失去了master的连接,所以我们需要重新创建
        // Master RpcEndpoint
        case Some(masterRef) =>
          if (registerMasterFutures != null) {
            registerMasterFutures.foreach(_.cancel(true))
          }
          val masterAddress = masterRef.address
          registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
            override def run(): Unit = {
              try {
                logInfo("Connecting to master " + masterAddress + "...")
                // 重新创建masterEndpoint
                val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                // 然后再给新的master发送注册消息去注册
                registerWithMaster(masterEndpoint)
              } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
              }
            }
          }))
        // 如果没有则向所有master注册,否则容易出现重复的worker错误
        case None =>
          if (registerMasterFutures != null) {
            registerMasterFutures.foreach(_.cancel(true))
          }
          registerMasterFutures = tryRegisterAllMasters()
      }
      // 如果重新注册次数超过初始的阀值,那么就会使用一个更大间隔的阀值
      if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
        registrationRetryTimer.foreach(_.cancel(true))
        registrationRetryTimer = Some(
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(ReregisterWithMaster)
            }
          }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
            PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
            TimeUnit.SECONDS))
      }
    } else {
      logError("All masters are unresponsive! Giving up.")
      System.exit(1)
    }
  }
}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: