Spark1.3从创建到提交:1)master和worker启动流程源码分析
2016-12-27 21:04
701 查看
1.Spark集群的Shell脚本启动
1.1start-all.sh
首先看下在master节点最简洁的集群启动方式:sbin/start-all.sh# Load the Spark configuration . "$sbin/spark-config.sh" # Start Master "$sbin"/start-master.sh $TACHYON_STR # Start Workers "$sbin"/start-slaves.sh $TACHYON_STR
从上面可以看到,读取完本机的配置文件后,sbin/start-all.sh->sbin/start-master.sh & start-slaves.sh
1.2start-master.sh
sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
读取本地变量后,sbin/start-master.sh->sbin/spark-daemon.sh
1.3spark-daemon.sh
spark-daemon.sh的功能比较强大,包括master的start和stop以及任务的submit,这里只看master的startnohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null
command=$1即org.apache.spark.deploy.master.Master,$@为之前start-master.sh调用spark-daemon.sh的所有参数。sbin/spark-daemon.sh->bin/spark-class
1.4spark-class
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"这里直接使用jar来执行了一个.class文件(org.apache.spark.deploy.master.Master),从而启动master进程,slave的启动类似,只不过中间使用了ssh命令远程启动而已。
2.org.apache.spark.deploy.master.Master
首先看下Master类,它的伴随对象的main方法def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) actorSystem.awaitTermination() }其主要调用了startSystemAndActor方法,代码如下
def startSystemAndActor( host: String, port: Int, webUiPort: Int, conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) val timeout = AkkaUtils.askTimeout(conf) val portsRequest = actor.ask(BoundPortsRequest)(timeout) val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) }其中通过AkkaUtils.createActorSystem创建了一个ActorSystem对象,使用actorSystem的actorOf的方法创建了acotor(MasterActor),因为Master继承了Actor类,因此Master中的preStart方法将会被调用。而receiveWithLogging方法也会一直监听对应的消息,receiveWithLogging接受worker注册的消息代码如下:
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } else { val workerAddress = worker.actor.path.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress) } } }如果当前的mater不是standby的节点(高可靠性,双Master),并且当期的worker没有注册,则把worker发送过来的信息封装成一个WorkerInfo对象,并给worker回复RegisteredWorker的信息。
小结下Master启动的流程:main->startSystemAndActor方法->创建actorSystem->创建MasterActor->preStart方法->接受worker的消息
3.org.apache.spark.deploy.worker.Worker
worker几乎和Master类似,比如下面的main方法def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() }startSystemAndActor方法
def startSystemAndActor( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) }startSystemAndActor方法主要触发了Worker类下的PreStart方法,而它下面有个比较重要的方法registerWithMaster
def registerWithMaster() { // DisassociatedEvent may be triggered multiple times, so don't attempt registration // if there are outstanding registration attempts scheduled. registrationRetryTimer match { case None => registered = false tryRegisterAllMasters() connectionAttemptCount = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } }由于网络的原因,worker会连续注册几次,其中tryRegisterAllMasters比较核心,它通过context.actorSelection(masterAkkaUrl)可以获得masterActor对象的引用,并发送消息
private def tryRegisterAllMasters() { for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } }在看下比较核心的receive方法,里面有接收master发送回来的注册成功RegisteredWorker消息,和通过自身发往Master的心跳机制
override def receiveWithLogging = { case RegisteredWorker(masterUrl, masterWebUiUrl) => logInfo("Successfully registered with master " + masterUrl) registered = true changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) if (CLEANUP_ENABLED) { logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir") context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis, CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup) } case SendHeartbeat => if (connected) { master ! Heartbeat(workerId) } // 其他的代码 }此时Master和Worker节点已经建立好通信机制,接下来就可以提交对应的任务了!
后话:之前总结过一篇Akka实现简单RPC通信,里面的内容和上面描述的类似,文章地址:Spark:Akka实现简单RPC通信
相关文章推荐
- Spark1.3从创建到提交:5)Executor启动源码分析
- spark源码分析Master与Worker启动流程篇
- Spark集群启动之Master、Worker启动流程源码分析
- Spark1.3从创建到提交:10)任务提交源码分析
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- spark core源码分析2 master启动流程
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
- spark 1.6.0 core源码分析4 worker启动流程
- spark core源码分析2 master启动流程
- Spark源码学习(二)---Master和Worker的启动以及Actor通信流程
- Spark的Master和Worker集群启动的源码分析
- spark 1.6.0 core源码分析2 master启动流程
- Spark1.3从创建到提交:4)资源分配源码分析
- spark-源码-Master与Worker的启动
- HBase 0.1.0版本源码分析--Master启动流程
- Spark源码分析之一:Job提交运行总流程概述
- spark内核揭秘-13-Worker中Executor启动过程源码分析