您的位置:首页 > 其它

学习笔记之ZooKeeper源码分析

2019-01-01 20:55 113 查看

zookeeper源码分析

(个人学习笔记,如有错误欢迎指正!!!)

服务器启动

首先zookeeper的服务器启动类为

org.apache.zookeeper.server.quorum.QuorumPeerMain
该类中包含
main()
方法:

public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
//从名字上可以分析出,该方法是初始化以及运行
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
......
} catch (ConfigException e) {
......
} catch (Exception e) {
......
}
LOG.info("Exiting normally");
System.exit(0);
}

protected void initializeAndRun(String[] args)
throws ConfigException, IOException{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//分析传入的参数
config.parse(args[0]);
}
......
if (args.length == 1 && config.servers.size() > 0) {
//根据配置文件,进行相应的运行操作
runFromConfig(config);
} else {
......
ZooKeeperServerMain.main(args);
}
}

public void runFromConfig(QuorumPeerConfig config) throws IOException {
......
try {
//根据配置,对quorumPeer进行相应的信息设置,其中一般quorumPeer对应一个server
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
......
//启动server
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}

其中,

org.apache.zookeeper.server.quorum.QuorumPeer
间接继承了
java.lang.Thread
类,所以该类可以看作是一个线程,
start()
方法便是线程的启动方法。

@Override
public synchronized void start() {
loadDataBase();
//启动客户端的监听服务
cnxnFactory.start();
//该方法用来启动leader选举,server在刚启动之后,需要通过leader选举来确定自己在集群中的身份
startLeaderElection();
super.start();
}
synchronized public void startLeaderElection() {
try {
//将自己设置为推荐者,(自己的id,当前最大的事务id,当前的选举轮数)
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
......
}
//遍历所有的server,保存自己的地址信息
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
......
}
//electionType的信息在前面配置quorumPeer信息时,已经通过config进行了设置,默认值是3,也就	//是说该部分不执行
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
//根据electionType的值,创建选举算法
this.electionAlg = createElectionAlgorithm(electionType);
}

因为

electionType
的默认值为3,所以默认使用的leader选举策略为
FastLeaderElection

protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;

//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//QuorumCnxManager实例主要负责将信息发送给指定的server和接受来自其他server的信息
//该实例中维护了两种线程SendWorker和RecvWorker,分别用于发送数据和接受数据
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}

创建完leader的选举策略之后,便调用了

super.start()
方法,所以便会执行
run()
方法:

@Override
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());

LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}

try {
//主循环,主要是根据自身的身份,作相应的循环操作
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");

if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");

// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}

leader选举策略

leaderElection / AuthFastLeaderElection / FastLeaderElection

但是目前真正使用的是 FastLeaderElection,其他的两个都被标记为

@Deprecated

下面分析

FastLeaderElection
选举策略的实现过程:

首先介绍各类之间的关系:

org.apache.zookeeper.server.quorum.QuorumCnxManager
类主要负责向指定的server发送选举信息,并且接受其他server发送过来的选举信息,该类有两个内部类
SendWorker
RecvWorker
,这两个类均间接继承了
java.lang.Thread

并且该类维护如下信息:

//记录server对应的SendWorker
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
//SendWorker线程会根据自己负责的server来获取queueSendMap中指定的阻塞队列,并且判断该线程中是否
//有信息,如果有则获取循环获取其中的信息,如果没有则将lastMessageSent中保存的上一次发送给该
//server的选举信息
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
//保存上一次发送给各个server的选举信息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
//接受队列,存放接收到的信息
public final ArrayBlockingQueue<Message> recvQueue;

SendWorker
会从
queueSendMap
lastMessageSent
获取信息,并将选举信息发送给指定的server;

RecvWorker
会接受指定的server发送的选举信息,并将信息保存到
recvQueue
中。

org.apache.zookeeper.server.quorum.QuorumCnxManager.SendWorker

@Override
public void run() {
threadCnt.incrementAndGet();
try {
//判断是否有需要发送的信息,如果没有则发送上一次发送的选举信息
//获取指定server(sid)的消息队列
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}

try {
//循环发送队列中的选举信息,如果没有信息,则跳出循环
while (running && !shutdown && sock != null) {

ByteBuffer b = null;
try {
//获取指定server(sid)的消息队列
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
//获取发送的消息
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}

if(b != null){
//放置到上次发送数据的队列中
lastMessageSent.put(sid, b);
//发送数据
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid + " my id = " +
self.getId() + " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
}

org.apache.zookeeper.server.quorum.QuorumCnxManager.RecvWorker

@Override
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
//接受数据,并将数据添加到recvQueue队列中
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
self.getId() + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
if (sock != null) {
closeSocket(sock);
}
}
}

此外,

org.apache.zookeeper.server.quorum.FastLeaderElection
中定义了内部类
org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger
,该类中又定义了两个内部类:
WorkerReceiver
WorkerSender
,这两个类都间接解程了
java.lang.Thread

并且

FastLeaderElection
中定义了如下队列,真正实现leader选举信息处理逻辑是在该类的
lookForLeader()
方法中:

//经过lookForLeader()方法处理之后的选举信息,ToSend是对发送的选举信息的封装
LinkedBlockingQueue<ToSend> sendqueue;
//lookForLeader()方法从该队列中读取选举信息进行处理,Notification是对接受的选举信息的封装
LinkedBlockingQueue<Notification> recvqueue;

WorkerSender
主要的工作是从
sendqueue
队列中读取发送的选举信息,并且根据目标server的sid,将信息放置到
QuorumCnxManager
queueSendMap
的对应 key的value中;

WorkerReceiver
主要的工作是从
QuorumCnxManager
recvQueue
读取接受的选举信息,放置到
recvqueue
中,方便
经过lookForLeader()
方法读取。

org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender

public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
//process会调用toSend方法
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
manager.toSend(m.sid, requestBuffer);
}

该方法是

org.apache.zookeeper.server.quorum.QuorumCnxManager
类中的方法:

public void toSend(Long sid, ByteBuffer b) {
//如果该消息是发送给自己的,则将信息放置到recvQueue队列中
if (self.getId() == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
//如果map中不包括该sid的发送队列,创建一个,并将发送信息添加到队列中
if (!queueSendMap.containsKey(sid)) {
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
queueSendMap.put(sid, bq);
addToSendQueue(bq, b);

} else {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if(bq != null){
addToSendQueue(bq, b);
} else {
LOG.error("No queue for server " + sid);
}
}
//连接该server
connectOne(sid);

}
}

org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver

public void run() {
Message response;
while (!stop) {
try{
//从recvQueue中获取选举信息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
//判断发送端server的sid,如果不在选举范围之内,返回自己当前的选举信息,主要用于区
//分observer
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());

sendqueue.offer(notmsg);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: "
+ response.buffer.capacity());
continue;
}
boolean backCompatibility = (response.buffer.capacity() == 28);
response.buffer.clear();

//实例化一个Notification
Notification n = new Notification();

//判断发送端server的状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}

n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
if(!backCompatibility){
n.peerEpoch = response.buffer.getLong();
} else {
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
n.version = (response.buffer.remaining() >= 4) ?
response.buffer.getInt() : 0x0;
if(LOG.isInfoEnabled()){
printNotification(n);
}
//如果当前的server为Looking状态,则将接受的消息假如到recvqueue中
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
//如果发送消息的server为Looking状态,并且接受的消息的选举轮数小于逻辑
//轮数(即选举消息过期),则将自己提议的选举发送回去
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock)){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
v.getPeerEpoch());
sendqueue.offer(notmsg);
}
} else {
//如果当前的server的状态不是Looking,但是发送端的server为Looking状态
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id =  " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
//则将选举信息封装成ToSend,返回给该服务器
ToSend notmsg;
if(n.version > 0x0) {
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());

} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}

下面分析

FastLeaderElection
lookForLeader()
的处理逻辑:

其中优先级的判断方法:

  • zxid大的优先级高,zxid表示事务id,越大表示操作越新
  • 如果zxid相同,则sid越大的优先级越大
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = System.currentTimeMillis();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock++;
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id =  " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
//保持循环,直到找到leader
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//从接受队列中获取选举信息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
//如果选举信息为null
if(n == null){
//认为可能是信息没有发送出去
if(manager.haveDelivered()){
//继续将自己提议的选举信息发送出去
sendNotifications();
//或者连接断开
} else {
//重新连接
manager.connectAll();
}
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
//如果参与选举的范围内有该server
else if(self.getVotingView().containsKey(n.sid)) {
//判断该server的状态
switch (n.state) {
case LOOKING:
//如果选举轮数大于自己的逻辑轮数,说明该选举信息为最新的
if (n.electionEpoch > logicalclock) {
logicalclock = n.electionEpoch;
//清空自己统计的选举信息
recvset.clear();
//根据一定优先级判断,谁赢得精选
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//并且更新提议的选举信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//否则返回自己当前的提议信息
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//并将提议选举信息返回给该server
sendNotifications();
//如果接受的选信息的选举轮数,小于自己的逻辑轮数,说明信息过期,不做处理
} else if (n.electionEpoch < logicalclock) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
//如果通过优先级判断赢得选举,更新自己的提议,并且返回给该server
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
//将接受到的选举信息添加到recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断是否已经有server赢得leader选举(超过半数同意)
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
//等待一段时间,如果下一个信息赢得了选举,则重新将提议发送出去
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
//等待一段时间之后,没有新的选举信息到达,则确定最终leader
if (n == null) {
//判断自己是否赢得leader选举,没有则将自己设置为following
//赢得,则将自己设置为leading
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());

Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
//observer不参与投票,所以直接忽略
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
//follower和leader的操作方相同
case FOLLOWING:
case LEADING:
//如果投票没有过期,直接判断自己是否是投票中的leader,如果是,设置自己为
//leader,因为发送端server状态不是looking说明已经投票结束
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));

if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());

Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}

outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));

if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}

watcher原理

watch机制的大致流程:

  • 客户端创建watcher实例并向zookeeper服务器端进行注册
  • 向服务器端注册成功后,将watcher保存到本地的WatchManager中
  • 当注册的监听事件触发后,zookeeper服务器端会通知客户端
  • 客户端接到通知后从WatchManager获取监听该事件的watcher
  • 执行这些watcher的process方法

zookeeper中可以进行watcher注册的方法有

exists()
,
getData()
,
getChildren()
,这里我们以
getData()
方法为例进行分析:

//getData()方法提供了两种实现方式,一种是传入具体的watcher实例,另一种是传入boolean变量
//当传入true时,采用默认 watcher进行注册,传入false时,不进行注册
//其中,默认watcher是指在创建zookeeper实例时传入的watcher
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);

WatchRegistration wcb = null;
if (watcher != null) {
//此处创建的实例对象会根据调用的方法的不同而不同
//getData():DataWatchRegistration
//exists():ExistsWatchRegistration
//getChildren():ChildWatchRegistration
//这些类都继承了WatchRegistration类
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
//对请求信息进行封装
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null)
23ff7
;
GetDataResponse response = new GetDataResponse();
//使用ClientCnxn实例将请求内容发送到服务端,这里的请求内容就是watcher的注册信息
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}

下面来看一下ZooKeeper的构造方法:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout
+ " watcher=" + watcher
+ " sessionId=" + Long.toHexString(sessionId)
+ " sessionPasswd="
+ (sessionPasswd == null ? "<null>" : "<hidden>"));
//如上面所说的,传入的watcher设置为默认的watcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//创建cnxn实例
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
cnxn.seenRwServerBefore = true;
//启动cnxn实例
cnxn.start();
}

因为

ClientCnxn
类中有两个线程,分别为
EventThread
SendThread
start()
方法就是启动这两个线程,这两个线程后面再介绍。

public void start() {
sendThread.start();
eventThread.start();
}

getData()
方法中 ,调用了
queuePacket()
方法:

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
//将发送的信息封装成packet,packet是zookeeper中发送数据的最小数据单元
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//将packet添加到发送队列中
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}

下面便是sendThread线程发送注册信息的过程:

@Override
public void run() {
//clientCnxnSocket的具体实现类型为ClientCnxnSocketNIO,其采用NIO的方式进行通信
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
final int MAX_SEND_PING_INTERVAL = 10000;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
//防止已经关闭,线程又重新启动
if (closing || !state.isAlive()) {
break;
}
//开始连接
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
//如果状态是已连接状态
if (state.isConnected()) {
//判断是否需要发送认证失败事件
if (zooKeeperSaslClient != null) {
......
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
......
}
//如果需要发送认证失败事件
if (sendAuthEvent == true) {
//将该事件交由eventThread线程来处理
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
......
}
if (state.isConnected()) {
......
}
if (state == States.CONNECTEDREADONLY) {
......
}
//经过各种判断之后,最终会调用doTransport方法来对发送的数据和接受的数据进行处理
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
......
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
......
}
......
}

doTransport()
方法中,主要是使用NIO的方式:

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}

} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//调用doIO方法对信息进行处理
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}

在该方法中,最中会调用

org.apache.zookeeper.ClientCnxn.SendThread#readResponse()
方法对接受的信息进行处理。

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//如果sockkey为刻度状态
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//调用readResponse方法对接受的信息进行处理
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//如果sockkey可写
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
//将packet写入到sock
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}

if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
enableWrite();
}
}
}
}

该方法是

ClientCnxn
的内部类
SendThread
类中的方法,该方法主要是针对接受的消息中
replyHdr
标识的不同而对不同的数据进行不同的处理。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();

replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
//replyHdr中Xid为-2,表示ping,不做处理
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
//replyHdr中Xid为-4,表示认证信息
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
//replyHdr中Xid为-1,表示表示事件通知
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
//创建事件
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//通过eventThread来将事件添加到队列中
eventThread.queueEvent( we );
return;
}
if (clientTunneledAuthenticationInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
//如果不为以上情况
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
//从pendingQueue获取packet
packet = pendingQueue.remove();
}
//判断,如果请求头中的Xid不等于响应头中Xid,则表示出现错误
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
//设置packet相关信息
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
//通过该方法对packet进行处理
finishPacket(packet);
}
}

finishPacket()
方法是
ClientCnxn
类中的方法:

private void finishPacket(Packet p) {
//如果packet中的watcher的注册信息不为空,则进行注册,会保存到一个map中
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}

下面来看eventThread线程的工作过程,该线程主要是负责对事件进行处理,相较于sendThread要简单的多:

@Override
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
//如果获取的事件为kill事件,则kill掉该线程
if (event == eventOfDeath) {
wasKilled = true;
} else {
//调用该方法处理event
processEvent(event);
}
if (wasKilled)
//如果kill线程,跳出主循环
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}

LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}

处理事件方法:

rivate void processEvent(Object event) {
try {
//主要分为两类,一类是处理event,event会在queueEvent方法中封装成WatcherSetEventPair
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
//调用所有监听该事件的watcher的process方法
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
//第二类,根据packet中数据的类型,进行不同的处理
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
......
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
......
} else if (p.response instanceof GetDataResponse) {
......
} else if (p.response instanceof GetACLResponse) {
......
} else if (p.response instanceof GetChildrenResponse) {
......
} else if (p.response instanceof GetChildren2Response) {
......
} else if (p.response instanceof CreateResponse) {
......
} else if (p.response instanceof MultiResponse) {
......
}  else if (p.cb instanceof VoidCallback) {
......
}
}
} catch (Throwable t) {
......
}
}
}

queueEvent()
queuepacket()
方法:

public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();

//将event进行封装
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
//添加到waitingEvents
waitingEvents.add(pair);
}

public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
//将packet添加到waitingEvents
waitingEvents.add(packet);
}
}

上述是客户端中watch机制的处理方式,服务器端的代码没有列出。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: