Hadoop2.6.0学习笔记(十四)Hadoop Two HA: ActiveStandbyElector
2015-08-06 11:46
435 查看
鲁春利的工作笔记,谁说程序员不能有文艺范?
本文翻译自http://johnjianfang.blogspot.com/,该网站不可直接访问,原因?你懂的!
Hadoop two HA通过Apache Zookeeper来协调和共享状态。Name node HA和Resource Manager HA均采用active/standby模式,在任意时间点只有一个active leader。为了从多个主机中选举出active leader,Hadoop提供了ActiveStandbyElector类,该类与Apache Zookeeper相结合来实现这个目标。ActiveStandbyElector更像是zookeeper管理员,工作在zookeeper和控制器(如ZKFailoverController)之间。
ActiveStandbyElector类的定义:
interface implementation of Zookeeper callback for create
interface implementation of Zookeeper watch events (connection and node)
ZKFailoverController for name node HA and EmbeddedElectorService for resource
manager HA.
follows.
FailoverController asks name node to become standby instead.
Hadoop2.6.0HA集群搭建 完成后,其中有一步是格式化zookeeper和启动HA的active和standby切换的。
格式化zookeeper
启动zkfc
本文翻译自http://johnjianfang.blogspot.com/,该网站不可直接访问,原因?你懂的!
Hadoop two HA通过Apache Zookeeper来协调和共享状态。Name node HA和Resource Manager HA均采用active/standby模式,在任意时间点只有一个active leader。为了从多个主机中选举出active leader,Hadoop提供了ActiveStandbyElector类,该类与Apache Zookeeper相结合来实现这个目标。ActiveStandbyElector更像是zookeeper管理员,工作在zookeeper和控制器(如ZKFailoverController)之间。
ActiveStandbyElector类的定义:
package org.apache.hadoop.ha; @InterfaceAudience.Private @InterfaceStability.Evolving public class ActiveStandbyElector implements StatCallback, StringCallback { // 暂时不做说明,先列在这里 public interface ActiveStandbyElectorCallback { void becomeActive() throws ServiceFailedException; void becomeStandby(); void enterNeutralMode(); void notifyFatalError(String errorMessage); void fenceOldActive(byte[] oldActiveData); } @VisibleForTesting protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock"; @VisibleForTesting protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb"; // 其他代码略 }中心思想为创建临时(EPHEMERAL)节点"ActiveStandbyElectorLock",并且只要有一台机器成功创建了该节点就成为leader。由于该节点是临时的,当会话过期或者连接失败时当前active leader可能lose这个节点,因此其他节点就有机会来创建相同的节点而成为leader。当前的active leader可能间隔一段时间后重新恢复为active状态,在这种特殊的情况下,可能会有两个active的leader。为了避免这种情况ActiveStandbyElector创建了一个持久(PERSISTENT)的节点"ActiveBreadCrumb",用来存储它自己的数据,因此一个新的active leader可以隔离(fence)之前的active leader,避免它重新恢复为active状态。
/** * ActiveStandbyElector类型写"ActiveBreadCrumb"节点方法, * 表明该节点在failover时需要被隔离(fenced) */ private void writeBreadCrumbNode(Stat oldBreadcrumbStat) throws KeeperException, InterruptedException { Preconditions.checkState(appData != null, "no appdata"); LOG.info("Writing znode " + zkBreadCrumbPath + " to indicate that the local node is the most recent active..."); if (oldBreadcrumbStat == null) { // No previous active, just create the node createWithRetries(zkBreadCrumbPath, appData, zkAcl, CreateMode.PERSISTENT); } else { // There was a previous active, update the node setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion()); } }例如,DFSZKFailoverController在"ActiveBreadCrumb"节点保存了HAServiceTarget对象,包括hostname, port, ZKFC port, name service id, and name node id。
package org.apache.hadoop.ha; @InterfaceAudience.LimitedPrivate("HDFS") public abstract class ZKFailoverController { static final Log LOG = LogFactory.getLog(ZKFailoverController.class); public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum"; private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms"; private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000; private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; public static final String ZK_ACL_KEY = "ha.zookeeper.acl"; private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda"; public static final String ZK_AUTH_KEY = "ha.zookeeper.auth"; static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; protected static final String USAGE = "Usage: java zkfc [ -formatZK [-force] [-nonInteractive] ]"; // 其他代码略 }
package org.apache.hadoop.hdfs.tools; @InterfaceAudience.Private public class DFSZKFailoverController extends ZKFailoverController { @Override protected byte[] targetToData(HAServiceTarget target) { // HAServiceTarget InetSocketAddress addr = target.getAddress(); return ActiveNodeInfo.newBuilder() .setHostname(addr.getHostName()) .setPort(addr.getPort()) .setZkfcPort(target.getZKFCAddress().getPort()) .setNameserviceId(localNNTarget.getNameServiceId()) .setNamenodeId(localNNTarget.getNameNodeId()) .build() .toByteArray(); } // 其他代码略 }
package org.apache.hadoop.ha; @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class HAServiceTarget { private static final String HOST_SUBST_KEY = "host"; private static final String PORT_SUBST_KEY = "port"; private static final String ADDRESS_SUBST_KEY = "address"; // 其他代码略 }
package org.apache.hadoop.hdfs.tools; @InterfaceAudience.Private public class NNHAServiceTarget extends HAServiceTarget { // Keys added to the fencing script environment private static final String NAMESERVICE_ID_KEY = "nameserviceid"; private static final String NAMENODE_ID_KEY = "namenodeid"; // 其他代码略 }从ActiveStandbyElector的定义能够看到实际上它是zookeeper的回调处理程序(callback handler)。
package org.apache.hadoop.ha; @InterfaceAudience.Private @InterfaceStability.Evolving public class ActiveStandbyElector implements StatCallback, StringCallback { // 其他代码略 private void createLockNodeAsync() { zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient); } private void monitorLockNodeAsync() { zkClient.exists(zkLockFilePath, watcher, this, zkClient); } }显然,当create()方法在zookeeper上被调用时,ActiveStandbyElector充当了StringCallback;而当exists()被调用去监控znode时,ActiveStandbyElector充当了StatCallback。
interface implementation of Zookeeper callback for create
@Override public synchronized void processResult(int rc, String path, Object ctx, String name) { if (isStaleClient(ctx)) return; LOG.debug("CreateNode result: " + rc + " for path: " + path + " connectionState: " + zkConnectionState + " for " + this); Code code = Code.get(rc); if (isSuccess(code)) { // we successfully created the znode. we are the leader. start monitoring if (becomeActive()) { monitorActiveStatus(); } else { reJoinElectionAfterFailureToBecomeActive(); } return; } if (isNodeExists(code)) { if (createRetryCount == 0) { // znode exists and we did not retry the operation. so a different // instance has created it. become standby and monitor lock. becomeStandby(); } // if we had retried then the znode could have been created by our first // attempt to the server (that we lost) and this node exists response is // for the second attempt. verify this case via ephemeral node owner. this // will happen on the callback for monitoring the lock. monitorActiveStatus(); return; } String errorMessage = "Received create error from Zookeeper. code:" + code.toString() + " for path " + path; LOG.debug(errorMessage); if (shouldRetry(code)) { if (createRetryCount < maxRetryNum) { LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); ++createRetryCount; createLockNodeAsync(); return; } errorMessage = errorMessage + ". Not retrying further znode create connection errors."; } else if (isSessionExpired(code)) { // This isn't fatal - the client Watcher will re-join the election LOG.warn("Lock acquisition failed because session was lost"); return; } fatalError(errorMessage); }interface implementation of Zookeeper callback for monitor (exists)
@Override public synchronized void processResult(int rc, String path, Object ctx, Stat stat) { if (isStaleClient(ctx)) return; assert wantToBeInElection : "Got a StatNode result after quitting election"; LOG.debug("StatNode result: " + rc + " for path: " + path + " connectionState: " + zkConnectionState + " for " + this); Code code = Code.get(rc); if (isSuccess(code)) { // the following owner check completes verification in case the lock znode // creation was retried if (stat.getEphemeralOwner() == zkClient.getSessionId()) { // we own the lock znode. so we are the leader if (!becomeActive()) { reJoinElectionAfterFailureToBecomeActive(); } } else { // we dont own the lock znode. so we are a standby. becomeStandby(); } // the watch set by us will notify about changes return; } if (isNodeDoesNotExist(code)) { // the lock znode disappeared before we started monitoring it enterNeutralMode(); joinElectionInternal(); return; } String errorMessage = "Received stat error from Zookeeper. code:" + code.toString(); LOG.debug(errorMessage); if (shouldRetry(code)) { if (statRetryCount < maxRetryNum) { ++statRetryCount; monitorLockNodeAsync(); return; } errorMessage = errorMessage + ". Not retrying further znode monitoring connection errors."; } else if (isSessionExpired(code)) { // This isn't fatal - the client Watcher will re-join the election LOG.warn("Lock monitoring failed because session was lost"); return; } fatalError(errorMessage); }ActiveStandbyElector注册了Zookeeper的观察者用来观察session和connection相关联的事件。
interface implementation of Zookeeper watch events (connection and node)
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) { Event.EventType eventType = event.getType(); if (isStaleClient(zk)) return; if (eventType == Event.EventType.None) { // the connection state has changed switch (event.getState()) { case SyncConnected: LOG.info("Session connected."); // if the listener was asked to move to safe state then it needs to be undone ConnectionState prevConnectionState = zkConnectionState; zkConnectionState = ConnectionState.CONNECTED; if (prevConnectionState == ConnectionState.DISCONNECTED && wantToBeInElection) { monitorActiveStatus(); } break; case Disconnected: LOG.info("Session disconnected. Entering neutral mode..."); // ask the app to move to safe state because zookeeper connection // is not active and we dont know our state zkConnectionState = ConnectionState.DISCONNECTED; enterNeutralMode(); break; case Expired: // the connection got terminated because of session timeout // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); reJoinElection(0); break; case SaslAuthenticated: LOG.info("Successfully authenticated to ZooKeeper using SASL."); break; default: fatalError("Unexpected Zookeeper watch event state: " + event.getState()); break; } return; } // 另一方面,对于node的变更事件,ActiveStandbyElector也会执行相应的动作。 String path = event.getPath(); if (path != null) { switch (eventType) { case NodeDeleted: if (state == State.ACTIVE) { enterNeutralMode(); } joinElectionInternal(); break; case NodeDataChanged: monitorActiveStatus(); break; default: LOG.debug("Unexpected node event: " + eventType + " for path: " + path); monitorActiveStatus(); } return; } // some unexpected error has occurred fatalError("Unexpected watch error from Zookeeper"); }The join action is simply to try to create the lock node.
private void joinElectionInternal() { Preconditions.checkState(appData != null, "trying to join election without any app data"); if (zkClient == null) { if (!reEstablishSession()) { fatalError("Failed to reEstablish connection with ZooKeeper"); return; } } createRetryCount = 0; wantToBeInElection = true; createLockNodeAsync(); }当lock node被成功创建后,ActiveStandbyElector首先尝试隔离(fence)旧的active leader,然后将它自己的数据写入breadcrumb节点。
private boolean becomeActive() { assert wantToBeInElection; if (state == State.ACTIVE) { // already active return true; } try { Stat oldBreadcrumbStat = fenceOldActive(); writeBreadCrumbNode(oldBreadcrumbStat); LOG.debug("Becoming active for " + this); appClient.becomeActive(); state = State.ACTIVE; return true; } catch (Exception e) { LOG.warn("Exception handling the winning of election", e); // Caller will handle quitting and rejoining the election. return false; } }如果ActiveStandbyElector需要退出选举,它将尝试删除他自己的breadcrumb节点。
public synchronized void quitElection(boolean needFence) { LOG.info("Yielding from election"); if (!needFence && state == State.ACTIVE) { // If active is gracefully going back to standby mode, remove // our permanent znode so no one fences us. tryDeleteOwnBreadCrumbNode(); } reset(); wantToBeInElection = false; }ActiveStandbyElector provides a callback interface for its caller such as
ZKFailoverController for name node HA and EmbeddedElectorService for resource
manager HA.
public interface ActiveStandbyElectorCallback { // 其他代码略 }For example, the ElectorCallbacks in ZKFailoverController is defined as
follows.
/** * Callbacks from elector */ class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } @Override public void becomeStandby() { ZKFailoverController.this.becomeStandby(); } @Override public void enterNeutralMode() { } @Override public void notifyFatalError(String errorMessage) { fatalError(errorMessage); } @Override public void fenceOldActive(byte[] data) { ZKFailoverController.this.fenceOldActive(data); } @Override public String toString() { synchronized (ZKFailoverController.this) { return "Elector callbacks for " + localTarget; } } }当ActiveStandbyElector在Zookeeper赢得了选举后,ZKFailoverController中的becomeActive()被用来执行一次到name node的RPC请求,以便相应的name node转变为active状态。
private synchronized void becomeActive() throws ServiceFailedException { LOG.info("Trying to make " + localTarget + " active..."); try { HAServiceProtocolHelper.transitionToActive(localTarget.getProxy( conf, FailoverController.getRpcTimeoutToNewActive(conf)), createReqInfo()); String msg = "Successfully transitioned " + localTarget + " to active state"; LOG.info(msg); serviceState = HAServiceState.ACTIVE; recordActiveAttempt(new ActiveAttemptRecord(true, msg)); } catch (Throwable t) { // 略 } }if ActiveStandbyElector makes a decision to become standby, the
FailoverController asks name node to become standby instead.
private synchronized void becomeStandby() { LOG.info("ZK Election indicated that " + localTarget + " should become standby"); try { int timeout = FailoverController.getGracefulFenceTimeout(conf); localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo()); LOG.info("Successfully transitioned " + localTarget + " to standby state"); } catch (Exception e) { LOG.error("Couldn't transition " + localTarget + " to standby state", e); // TODO handle this. It's a likely case since we probably got fenced // at the same time. } serviceState = HAServiceState.STANDBY; }
Hadoop2.6.0HA集群搭建 完成后,其中有一步是格式化zookeeper和启动HA的active和standby切换的。
格式化zookeeper
bin/hdfs zkfc -formatZK # 在hdfs这个shell文件中: COMMAND=$1 # 中间略 elif [ "$COMMAND" = "zkfc" ] ; then CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController' HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS" elif # 略执行DFSZKFailoverController的main方法
public static void main(String args[]) throws Exception { if (DFSUtil.parseHelpArgument(args,ZKFailoverController.USAGE, System.out, true)) { System.exit(0); } GenericOptionsParser parser = new GenericOptionsParser(new HdfsConfiguration(), args); DFSZKFailoverController zkfc = DFSZKFailoverController.create(parser.getConfiguration()); System.exit(zkfc.run(parser.getRemainingArgs())); } # HdfsConfiguration extends Configuration通过静态方法初始化了配置参数DFSZKFailoverController的main方法中调用了ZKFailoverController的run,在run中调用doRun(String[] args)接收formatZK参数
private int doRun(String[] args) { try { initZK(); } catch (KeeperException ke) { // ...... return ERR_CODE_NO_ZK; } if (args.length > 0) { if ("-formatZK".equals(args[0])) { // hdfs zkfc后接参数且为formatZK boolean force = false; boolean interactive = true; for (int i = 1; i < args.length; i++) { if ("-force".equals(args[i])) { // formatZK -force force = true; } else if ("-nonInteractive".equals(args[i])) { // formatZK -nonInteractive interactive = false; } else { // 参数错误 badArg(args[i]); } } // 执行格式化操作 return formatZK(force, interactive); } else { // hdfs zkfc后接参数但非formatZK badArg(args[0]); } } // 无参数 if (!elector.parentZNodeExists()) { // ...... return ERR_CODE_NO_PARENT_ZNODE; } try { localTarget.checkFencingConfigured(); } catch (BadFencingConfigurationException e) { // ...... return ERR_CODE_NO_FENCER; } initRPC(); initHM(); startRPC(); try { mainLoop(); } finally { rpcServer.stopAndJoin(); elector.quitElection(true); healthMonitor.shutdown(); healthMonitor.join(); } return 0; }
启动zkfc
sbin/hadoop-daemon.sh start zkfc
相关文章推荐
- linux 并发控制
- java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory 解决方案
- Linux启动过程总结
- 《鸟哥的Linux私房菜》 学习Shell部分(1)
- linux centos 设置防火墙 iptables 如何 禁止某个IP访问 登陆
- Linux Command
- linux下的find文件查找命令与grep文件内容查找命令
- 基于Docker的PHP开发环境
- 网站服务架构
- linux 文件,字符,shell基本操作
- Linux 进程间 互斥锁
- 常用linux命令
- Apache通用日志工具commons-logging和Log4j使用总结
- 用g++编译生成动态连接库*.so的方法及连接(dlopen() dlsym() dlclose())
- 基于Schema的AOP
- 部署项目到linux系统操作方法
- OpenCV不同图像深度类型的理解
- 嵌入式linux之Nand flash驱动程序框架
- 随笔--Linux系统环境
- Openstack(Kilo)安装系列之环境准备(一)