您的位置:首页 > 其它

Spark集群启动之Master、Worker启动流程源码分析

2017-03-08 22:31 721 查看
Spark集群启动Master可以使用脚本启动:start-master,shell脚本细节自行查看。

最终启动命令为:java -cp /home/daxin/bigdata/spark/conf/:/home/daxin/bigdata/spark/jars/*:/home/daxin/bigdata/hadoop/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --host node --port 7077 --webui-port 8080

最终转换为java命令启动Master的过程,所以我们就需要查看一下Master的main方法代码如下:

val systemName = "sparkMaster"
private val actorName = "Master"
/**
* spark-class脚本调用,启动master
*
* @param argStrings
*/
def main(argStrings: Array[String]) {

SignalLogger.register(log)
//参数配置准备
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)

//创建actorSystem
// (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()

}

通过代码可以可以知道调用startSystemAndActor方法完成ActorSystem和Actor的创建。startSystemAndActor方法中调用

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr)

完成ActorSystem的创建,然后调用:
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

完成在该actorSystem中对actor的创建。然后执行Master的声明周期方法preStart:
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
/**
* 定时器,定义给自己发送心跳去检查是否有超超时的worker,有的话移除超时Worker。
*/
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
/**
* 高可用时候,元数据共享选择持久化引擎,分为ZOOKEEPER、FILESYSTEM、CUSTOM
*/
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
.newInstance(conf, SerializationExtension(context.system))
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}

接下来执行声明周期方法:
override def receiveWithLogging = {
//高可用选举
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
//开始恢复Master方法
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}

case CompleteRecovery => completeRecovery()

case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
}

/**
* Worker发来的注册消息
*/
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => {

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory)))
//如果当前Master状态为RecoveryState.STANDBY ,不回应Worker信息。
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
//如果包含WorkerInfo了,回复注册失败信息
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
//注册新的Worker信息
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
//完成worker的持久化,以防master宕机之后无法恢复
persistenceEngine.addWorker(worker)
//给Worker发送消息:告诉worker完成注册RegisteredWorker
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress)
}
}
}

case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()

// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".

sender ! SubmitDriverResponse(true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}")
}
}

case RequestKillDriver(driverId) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
sender ! KillDriverResponse(driverId, success = false, msg)
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
driver match {
case Some(d) =>
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self ! DriverStateChanged(driverId, DriverState.KILLED, None)
} else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
d.worker.foreach { w =>
w.actor ! KillDriver(driverId)
}
}
// TODO: It would be nice for this to be a synchronous response
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
sender ! KillDriverResponse(driverId, success = true, msg)
case None =>
val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
sender ! KillDriverResponse(driverId, success = false, msg)
}
}
}

case RequestDriverStatus(driverId) => {
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
sender ! DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
case None =>
sender ! DriverStatusResponse(found = false, None, None, None, None)
}
}

/**
* 提交应用给Master,Master启动executor
*
* <br>(如果没有理解错误的话)description中的command应该是:val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",其余参数略)
* 代码位置:类的 SparkDeploySchedulerBackend中的command
*
*/
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//TODO 把应用信息存到内存, 重点:sender应该是clientActor
val app = createApplication(description, sender) //sender应该是clientActor

registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//持久化app,实现容错
persistenceEngine.addApplication(app)
//回复appClient已经注册(这一块不是worker)
sender ! RegisteredApplication(app.id, masterUrl)
//TODO Master开始调度资源,其实就是把任务启动启动到哪些Worker上
schedule()
}
}
//TODO appClient发送来的消息,通知Executor状态
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {

val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) {
appInfo.resetRetryCount()
}
// exec.application.driver = driverClient
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)

// 完成状态包括:KILLED, FAILED, LOST, EXITED 注意:这里是完成,不是成功!
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec) //appInfo移除executor
exec.worker.removeExecutor(exec) //worker移除executor

val normalExit = exitStatus == Some(0) //判断是否正常推出
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
//异常退出
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
//当前重试次数是否小于最大重试次数MAX_NUM_RETRY10,如果小于重新调度
schedule()
} else {
//超过最大重启次数
val execs = appInfo.executors.values //获取当前app的所有executors
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
//如果不存在运行的executor的话,直接removeApplication
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
//位置状态
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}

/**
* Worker发送来的消息,告诉Driver当前worker状态
*
*/
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}

case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
sender ! ReconnectWorker(masterUrl)
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
}

case MasterChangeAcknowledged(appId) => {
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}

if (canCompleteRecovery) {
completeRecovery()
}
}

case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
worker.state = WorkerState.ALIVE

val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}

for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}

if (canCompleteRecovery) {
completeRecovery()
}
}

case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) {
completeRecovery()
}
}

case RequestMasterState => {
sender ! MasterStateResponse(
host, port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state)
}

case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}

case BoundPortsRequest => {
sender ! BoundPortsResponse(port, webUi.boundPort, restServerBoundPort)
}
}

补充:关于Master的参数配置在org.apache.spark.util.AkkaUtils$#doCreateActorSystem方法中完成!

接下来看看Worker的启动:

Worker启动脚本有:

1:start-slave.sh   指定masterUrl  只能在本地节点启动worker

2:start-slaves.sh
 SSH到各个Worker节点启动,里面调用的是slaves.sh脚本

java -cp /home/daxin/bigdata/spark/conf/:/home/daxin/bigdata/spark/jars/*:/home/daxin/bigdata/hadoop/etc/hadoop/
-Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://node:7077

同理也是执行Worker的main方法:

/**
*
* spark启动worker脚本调用main方法执行启动worker
*
* @param argStrings
*/

def main(argStrings: Array[String]) {
//完成配置信息
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)

//创建actorSystem
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}

同Master的main方法过程,接下来看看Worker的生命周期方法:
/**
* registered :Actor的声明周期方法
* 在registered中完成向Master的注册
*
*/
override def preStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(host, port, cores, Utils.megabytesToString(memory)))

logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()//完成向Master的注册
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

改天继续,时候太晚要回宿舍了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: