Spark学习之13:Standalone HA
2015-05-28 16:14
357 查看
Standalone模式提供了通过zookeeper来保证Master的高可用性。Standalone模式可以利用Zookeeper来提多个Master间的领导选择和Worker、App的状态存储。
在Master启动时,对应的Master actor对象会根据RECOVERY_MODE来创建相应的Master失败恢复模式。
本文描述通过Zookeeper来恢复Master的过程。
使用Zookeeper来恢复Master需要如下系统属性:
具体可参考Spark文档。
创建Master Actor对象后,akka会自动调用该方法。
preStart根据RECOVERY_MODE来创建相应的恢复模式。
在Zookeeper模式下:
(1)创建ZooKeeperRecoveryModeFactory;
(2)通过ZooKeeperRecoveryModeFactory来创建数据持久化引擎,用于存储worker、app的状态等,并创建领导选择代理。
领导选择代理。
(1)每个Master对应的ZooKeeperLeaderElectionAgent对象都会通过SparkConf连接到Zookeeper,调用SparkCuratorUtil.newClient;
(2)将ZooKeeperLeaderElectionAgent对象作为LeaderLatch的listener,LeaderLatch将回调isLeader和notLeader方法;isLeader和notLeader方法将调用updateLeadershipStatus方法,通知Master
actor。
(3)启动notLeader对象。
在status状态发生变化时,通知Master Actor对象。如果选为leader则调用Master.electedLeader方法,否则调用Master.revokedLeadership。
这两个方法分别向自身(Master Actor)发送ElectedLeader和RevokedLeadership消息。
Master被选为Leader。
(1)调用persistenceEngine读取数据,系统初次启动时,返回参数均为空;
(2)根据返回参数的状态,设置state为RecoveryState.ALIVE或RecoveryState.RECOVERING;
(3)如果state状态为RecoveryState.RECOVERING,则调用beginRecovery,开始Master恢复过程。
(1)在该Master中重新注册app,并通知DriverActor,Master已经发生变化;
(2)在该Master中重新注册worker,并通知Worker Actor,Master已经发生变化。
注:
通过spark-submit提交应用时,由于不知道哪个Master是leader,所以需要提供所有的Master列表。当添加一个Worker节点时,由于也不知道哪个Master是leader,所以在调用sbin/start-slave.sh时,需要提供所有的Master列表。然而,通过sbin/start-all.sh启动整个集群时无需提供Master列表,也没法提供。
在Master启动时,对应的Master actor对象会根据RECOVERY_MODE来创建相应的Master失败恢复模式。
本文描述通过Zookeeper来恢复Master的过程。
1. 配置项
使用Zookeeper来恢复Master需要如下系统属性:具体可参考Spark文档。
2. Master
2.1. Master.preStart
创建Master Actor对象后,akka会自动调用该方法。override def preStart() { ...... val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system)) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system)) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) ...... } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ }
preStart根据RECOVERY_MODE来创建相应的恢复模式。
在Zookeeper模式下:
(1)创建ZooKeeperRecoveryModeFactory;
(2)通过ZooKeeperRecoveryModeFactory来创建数据持久化引擎,用于存储worker、app的状态等,并创建领导选择代理。
2.2. ZooKeeperLeaderElectionAgent
领导选择代理。private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable, conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" private var zk: CuratorFramework = _ private var leaderLatch: LeaderLatch = _ private var status = LeadershipStatus.NOT_LEADER start() def start() { logInfo("Starting ZooKeeper LeaderElection agent") zk = SparkCuratorUtil.newClient(conf) leaderLatch = new LeaderLatch(zk, WORKING_DIR) leaderLatch.addListener(this) leaderLatch.start() }
(1)每个Master对应的ZooKeeperLeaderElectionAgent对象都会通过SparkConf连接到Zookeeper,调用SparkCuratorUtil.newClient;
(2)将ZooKeeperLeaderElectionAgent对象作为LeaderLatch的listener,LeaderLatch将回调isLeader和notLeader方法;isLeader和notLeader方法将调用updateLeadershipStatus方法,通知Master
actor。
(3)启动notLeader对象。
2.2.1. ZooKeeperLeaderElectionAgent.updateLeadershipStatus
def updateLeadershipStatus(isLeader: Boolean) { if (isLeader && status == LeadershipStatus.NOT_LEADER) { status = LeadershipStatus.LEADER masterActor.electedLeader() } else if (!isLeader && status == LeadershipStatus.LEADER) { status = LeadershipStatus.NOT_LEADER masterActor.revokedLeadership() } }
在status状态发生变化时,通知Master Actor对象。如果选为leader则调用Master.electedLeader方法,否则调用Master.revokedLeadership。
2.2.2. Master.electedLeader和revokedLeadership
override def electedLeader() { self ! ElectedLeader } override def revokedLeadership() { self ! RevokedLeadership }
这两个方法分别向自身(Master Actor)发送ElectedLeader和RevokedLeadership消息。
2.3. ElectedLeader消息处理
override def receiveWithLogging = { case ElectedLeader => { val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery) } }
Master被选为Leader。
(1)调用persistenceEngine读取数据,系统初次启动时,返回参数均为空;
(2)根据返回参数的状态,设置state为RecoveryState.ALIVE或RecoveryState.RECOVERING;
(3)如果state状态为RecoveryState.RECOVERING,则调用beginRecovery,开始Master恢复过程。
2.3.1. Master.beginRecovery
def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]) { for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { registerApplication(app) app.state = ApplicationState.UNKNOWN app.driver ! MasterChanged(masterUrl, masterWebUiUrl) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } for (driver <- storedDrivers) { // Here we just read in the list of drivers. Any drivers associated with now-lost workers // will be re-launched when we detect that the worker is missing. drivers += driver } for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { registerWorker(worker) worker.state = WorkerState.UNKNOWN worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }
(1)在该Master中重新注册app,并通知DriverActor,Master已经发生变化;
(2)在该Master中重新注册worker,并通知Worker Actor,Master已经发生变化。
注:
通过spark-submit提交应用时,由于不知道哪个Master是leader,所以需要提供所有的Master列表。当添加一个Worker节点时,由于也不知道哪个Master是leader,所以在调用sbin/start-slave.sh时,需要提供所有的Master列表。然而,通过sbin/start-all.sh启动整个集群时无需提供Master列表,也没法提供。
相关文章推荐
- spark standalone zookeeper HA部署方式
- Spark1.0.0 Standalone HA的实现
- Spark1.2集群环境搭建(Standalone+HA) 4G内存5个节点也是蛮拼的
- Spark Standalone HA
- spark standalone zookeeper HA部署方式
- Spark1.2集群环境搭建(Standalone+HA) 4G内存5个节点也是蛮拼的
- Spark1.0.0 Standalone HA的实现
- Spark1.2集群环境搭建(Standalone+HA) 4G内存5个节点也是蛮拼的
- Spark1.0.0 Standalone HA的实现
- Spark(一)-- Standalone HA的部署
- Spark(一)-- Standalone HA的部署
- spark standalone ha spark submit
- 【Spark学习】Apache Spark部署之Standalone Mode
- Spark standalone HA
- Spark1.2集群环境搭建(Standalone+HA) 4G内存5个节点
- Spark学习(三)---Spark Standalone Mode说明及参数配置详解
- Spark学习笔记之-Spark Standalone(环境搭建)
- spark standalone深入学习-核心概念\启动过程\服务down掉说明
- spark standalone zookeeper HA部署方式
- Spark standalone HA