您的位置:首页 > 运维架构

Hadoop2.6.0学习笔记(十四)Hadoop Two HA: ActiveStandbyElector

2015-08-06 11:46 435 查看
鲁春利的工作笔记,谁说程序员不能有文艺范?

本文翻译自http://johnjianfang.blogspot.com/,该网站不可直接访问,原因?你懂的!
Hadoop two HA通过Apache Zookeeper来协调和共享状态。Name node HA和Resource Manager HA均采用active/standby模式,在任意时间点只有一个active leader。为了从多个主机中选举出active leader,Hadoop提供了ActiveStandbyElector类,该类与Apache Zookeeper相结合来实现这个目标。ActiveStandbyElector更像是zookeeper管理员,工作在zookeeper和控制器(如ZKFailoverController)之间。
ActiveStandbyElector类的定义:
package org.apache.hadoop.ha;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector implements StatCallback, StringCallback {
// 暂时不做说明,先列在这里
public interface ActiveStandbyElectorCallback {
void becomeActive() throws ServiceFailedException;
void becomeStandby();
void enterNeutralMode();
void notifyFatalError(String errorMessage);
void fenceOldActive(byte[] oldActiveData);
}

@VisibleForTesting
protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
@VisibleForTesting
protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";

// 其他代码略
}
中心思想为创建临时(EPHEMERAL)节点"ActiveStandbyElectorLock",并且只要有一台机器成功创建了该节点就成为leader。由于该节点是临时的,当会话过期或者连接失败时当前active leader可能lose这个节点,因此其他节点就有机会来创建相同的节点而成为leader。当前的active leader可能间隔一段时间后重新恢复为active状态,在这种特殊的情况下,可能会有两个active的leader。为了避免这种情况ActiveStandbyElector创建了一个持久(PERSISTENT)的节点"ActiveBreadCrumb",用来存储它自己的数据,因此一个新的active leader可以隔离(fence)之前的active leader,避免它重新恢复为active状态。
/**
* ActiveStandbyElector类型写"ActiveBreadCrumb"节点方法,
* 表明该节点在failover时需要被隔离(fenced)
*/
private void writeBreadCrumbNode(Stat oldBreadcrumbStat) throws KeeperException, InterruptedException {
Preconditions.checkState(appData != null, "no appdata");

LOG.info("Writing znode " + zkBreadCrumbPath + " to indicate that the local node is the most recent active...");
if (oldBreadcrumbStat == null) {
// No previous active, just create the node
createWithRetries(zkBreadCrumbPath, appData, zkAcl, CreateMode.PERSISTENT);
} else {
// There was a previous active, update the node
setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
}
}
例如,DFSZKFailoverController在"ActiveBreadCrumb"节点保存了HAServiceTarget对象,包括hostname, port, ZKFC port, name service id, and name node id。
package org.apache.hadoop.ha;

@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController {

static final Log LOG = LogFactory.getLog(ZKFailoverController.class);

public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";

protected static final String USAGE = "Usage: java zkfc [ -formatZK [-force] [-nonInteractive] ]";

// 其他代码略
}
package org.apache.hadoop.hdfs.tools;

@InterfaceAudience.Private
public class DFSZKFailoverController extends ZKFailoverController {
@Override
protected byte[] targetToData(HAServiceTarget target) {    // HAServiceTarget
InetSocketAddress addr = target.getAddress();

return ActiveNodeInfo.newBuilder()
.setHostname(addr.getHostName())
.setPort(addr.getPort())
.setZkfcPort(target.getZKFCAddress().getPort())
.setNameserviceId(localNNTarget.getNameServiceId())
.setNamenodeId(localNNTarget.getNameNodeId())
.build()
.toByteArray();
}

// 其他代码略
}
package org.apache.hadoop.ha;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HAServiceTarget {
private static final String HOST_SUBST_KEY = "host";
private static final String PORT_SUBST_KEY = "port";
private static final String ADDRESS_SUBST_KEY = "address";

// 其他代码略
}
package org.apache.hadoop.hdfs.tools;
@InterfaceAudience.Private
public class NNHAServiceTarget extends HAServiceTarget {
// Keys added to the fencing script environment
private static final String NAMESERVICE_ID_KEY = "nameserviceid";
private static final String NAMENODE_ID_KEY = "namenodeid";

// 其他代码略
}
从ActiveStandbyElector的定义能够看到实际上它是zookeeper的回调处理程序(callback handler)。
package org.apache.hadoop.ha;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ActiveStandbyElector implements StatCallback, StringCallback {
// 其他代码略
private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
}

private void monitorLockNodeAsync() {
zkClient.exists(zkLockFilePath, watcher, this, zkClient);
}
}
显然,当create()方法在zookeeper上被调用时,ActiveStandbyElector充当了StringCallback;而当exists()被调用去监控znode时,ActiveStandbyElector充当了StatCallback。
interface implementation of Zookeeper callback for create

@Override
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
if (isStaleClient(ctx)) return;
LOG.debug("CreateNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState +
"  for " + this);

Code code = Code.get(rc);
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
if (becomeActive()) {
monitorActiveStatus();
} else {
reJoinElectionAfterFailureToBecomeActive();
}
return;
}

if (isNodeExists(code)) {
if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock.
becomeStandby();
}
// if we had retried then the znode could have been created by our first
// attempt to the server (that we lost) and this node exists response is
// for the second attempt. verify this case via ephemeral node owner. this
// will happen on the callback for monitoring the lock.
monitorActiveStatus();
return;
}

String errorMessage = "Received create error from Zookeeper. code:"
+ code.toString() + " for path " + path;
LOG.debug(errorMessage);

if (shouldRetry(code)) {
if (createRetryCount < maxRetryNum) {
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
++createRetryCount;
createLockNodeAsync();
return;
}
errorMessage = errorMessage
+ ". Not retrying further znode create connection errors.";
} else if (isSessionExpired(code)) {
// This isn't fatal - the client Watcher will re-join the election
LOG.warn("Lock acquisition failed because session was lost");
return;
}

fatalError(errorMessage);
}
interface implementation of Zookeeper callback for monitor (exists)
@Override
public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
if (isStaleClient(ctx)) return;

assert wantToBeInElection :
"Got a StatNode result after quitting election";

LOG.debug("StatNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState + " for " + this);

Code code = Code.get(rc);
if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
// we own the lock znode. so we are the leader
if (!becomeActive()) {
reJoinElectionAfterFailureToBecomeActive();
}
} else {
// we dont own the lock znode. so we are a standby.
becomeStandby();
}
// the watch set by us will notify about changes
return;
}

if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it
enterNeutralMode();
joinElectionInternal();
return;
}

String errorMessage = "Received stat error from Zookeeper. code:"
+ code.toString();
LOG.debug(errorMessage);

if (shouldRetry(code)) {
if (statRetryCount < maxRetryNum) {
++statRetryCount;
monitorLockNodeAsync();
return;
}
errorMessage = errorMessage
+ ". Not retrying further znode monitoring connection errors.";
} else if (isSessionExpired(code)) {
// This isn't fatal - the client Watcher will re-join the election
LOG.warn("Lock monitoring failed because session was lost");
return;
}

fatalError(errorMessage);
}
ActiveStandbyElector注册了Zookeeper的观察者用来观察session和connection相关联的事件。
interface implementation of Zookeeper watch events (connection and node)
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;

if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to  be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED && wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");

// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: " + event.getState());
break;
}

return;
}

// 另一方面,对于node的变更事件,ActiveStandbyElector也会执行相应的动作。
String path = event.getPath();
if (path != null) {
switch (eventType) {
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
case NodeDataChanged:
monitorActiveStatus();
break;
default:
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
monitorActiveStatus();
}

return;
}

// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
The join action is simply to try to create the lock node.

private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}

createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}
当lock node被成功创建后,ActiveStandbyElector首先尝试隔离(fence)旧的active leader,然后将它自己的数据写入breadcrumb节点。
private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);

LOG.debug("Becoming active for " + this);
appClient.becomeActive();
state = State.ACTIVE;
return true;
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
// Caller will handle quitting and rejoining the election.
return false;
}
}
如果ActiveStandbyElector需要退出选举,它将尝试删除他自己的breadcrumb节点。
public synchronized void quitElection(boolean needFence) {
LOG.info("Yielding from election");
if (!needFence && state == State.ACTIVE) {
// If active is gracefully going back to standby mode, remove
// our permanent znode so no one fences us.
tryDeleteOwnBreadCrumbNode();
}
reset();
wantToBeInElection = false;
}
ActiveStandbyElector provides a callback interface for its caller such as
ZKFailoverController for name node HA and EmbeddedElectorService for resource
manager HA.

public interface ActiveStandbyElectorCallback {
// 其他代码略
}
For example, the ElectorCallbacks in ZKFailoverController is defined as
follows.

/**
* Callbacks from elector
*/
class ElectorCallbacks implements ActiveStandbyElectorCallback {
@Override
public void becomeActive() throws ServiceFailedException {
ZKFailoverController.this.becomeActive();
}

@Override
public void becomeStandby() {
ZKFailoverController.this.becomeStandby();
}

@Override
public void enterNeutralMode() {
}

@Override
public void notifyFatalError(String errorMessage) {
fatalError(errorMessage);
}

@Override
public void fenceOldActive(byte[] data) {
ZKFailoverController.this.fenceOldActive(data);
}

@Override
public String toString() {
synchronized (ZKFailoverController.this) {
return "Elector callbacks for " + localTarget;
}
}
}
当ActiveStandbyElector在Zookeeper赢得了选举后,ZKFailoverController中的becomeActive()被用来执行一次到name node的RPC请求,以便相应的name node转变为active状态。
private synchronized void becomeActive() throws ServiceFailedException {
LOG.info("Trying to make " + localTarget + " active...");
try {
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
conf, FailoverController.getRpcTimeoutToNewActive(conf)), createReqInfo());
String msg = "Successfully transitioned " + localTarget + " to active state";
LOG.info(msg);
serviceState = HAServiceState.ACTIVE;
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
} catch (Throwable t) {
// 略
}
}
if ActiveStandbyElector makes a decision to become standby, the
FailoverController asks name node to become standby instead.
private synchronized void becomeStandby() {
LOG.info("ZK Election indicated that " + localTarget +
" should become standby");
try {
int timeout = FailoverController.getGracefulFenceTimeout(conf);
localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
LOG.info("Successfully transitioned " + localTarget +
" to standby state");
} catch (Exception e) {
LOG.error("Couldn't transition " + localTarget + " to standby state",
e);
// TODO handle this. It's a likely case since we probably got fenced
// at the same time.
}
serviceState = HAServiceState.STANDBY;
}


Hadoop2.6.0HA集群搭建 完成后,其中有一步是格式化zookeeper和启动HA的active和standby切换的。
格式化zookeeper
bin/hdfs zkfc -formatZK
#
在hdfs这个shell文件中:
COMMAND=$1
# 中间略
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif     # 略
执行DFSZKFailoverController的main方法
public static void main(String args[]) throws Exception {
if (DFSUtil.parseHelpArgument(args,ZKFailoverController.USAGE, System.out, true)) {
System.exit(0);
}

GenericOptionsParser parser = new GenericOptionsParser(new HdfsConfiguration(), args);
DFSZKFailoverController zkfc = DFSZKFailoverController.create(parser.getConfiguration());

System.exit(zkfc.run(parser.getRemainingArgs()));
}
# HdfsConfiguration extends Configuration通过静态方法初始化了配置参数
DFSZKFailoverController的main方法中调用了ZKFailoverController的run,在run中调用doRun(String[] args)接收formatZK参数
private int doRun(String[] args) {
try {
initZK();
} catch (KeeperException ke) {
// ......
return ERR_CODE_NO_ZK;
}
if (args.length > 0) {
if ("-formatZK".equals(args[0])) {        // hdfs zkfc后接参数且为formatZK
boolean force = false;
boolean interactive = true;
for (int i = 1; i < args.length; i++) {
if ("-force".equals(args[i])) {       // formatZK -force
force = true;
} else if ("-nonInteractive".equals(args[i])) {     // formatZK -nonInteractive
interactive = false;
} else {                    // 参数错误
badArg(args[i]);
}
}        // 执行格式化操作
return formatZK(force, interactive);
} else {                                  // hdfs zkfc后接参数但非formatZK
badArg(args[0]);
}
}

// 无参数
if (!elector.parentZNodeExists()) {
// ......
return ERR_CODE_NO_PARENT_ZNODE;
}

try {
localTarget.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
// ......
return ERR_CODE_NO_FENCER;
}

initRPC();
initHM();
startRPC();
try {
mainLoop();
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}


启动zkfc

sbin/hadoop-daemon.sh start zkfc
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: