学习笔记之ZooKeeper源码分析
2019-01-01 20:55
113 查看
zookeeper源码分析
(个人学习笔记,如有错误欢迎指正!!!)
服务器启动
首先zookeeper的服务器启动类为
org.apache.zookeeper.server.quorum.QuorumPeerMain该类中包含
main()方法:
public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { //从名字上可以分析出,该方法是初始化以及运行 main.initializeAndRun(args); } catch (IllegalArgumentException e) { ...... } catch (ConfigException e) { ...... } catch (Exception e) { ...... } LOG.info("Exiting normally"); System.exit(0); } protected void initializeAndRun(String[] args) throws ConfigException, IOException{ QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { //分析传入的参数 config.parse(args[0]); } ...... if (args.length == 1 && config.servers.size() > 0) { //根据配置文件,进行相应的运行操作 runFromConfig(config); } else { ...... ZooKeeperServerMain.main(args); } } public void runFromConfig(QuorumPeerConfig config) throws IOException { ...... try { //根据配置,对quorumPeer进行相应的信息设置,其中一般quorumPeer对应一个server ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); quorumPeer = new QuorumPeer(); quorumPeer.setClientPortAddress(config.getClientPortAddress()); ...... //启动server quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
其中,
org.apache.zookeeper.server.quorum.QuorumPeer间接继承了
java.lang.Thread类,所以该类可以看作是一个线程,
start()方法便是线程的启动方法。
@Override public synchronized void start() { loadDataBase(); //启动客户端的监听服务 cnxnFactory.start(); //该方法用来启动leader选举,server在刚启动之后,需要通过leader选举来确定自己在集群中的身份 startLeaderElection(); super.start(); } synchronized public void startLeaderElection() { try { //将自己设置为推荐者,(自己的id,当前最大的事务id,当前的选举轮数) currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { ...... } //遍历所有的server,保存自己的地址信息 for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { ...... } //electionType的信息在前面配置quorumPeer信息时,已经通过config进行了设置,默认值是3,也就 //是说该部分不执行 if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } //根据electionType的值,创建选举算法 this.electionAlg = createElectionAlgorithm(electionType); }
因为
electionType的默认值为3,所以默认使用的leader选举策略为
FastLeaderElection。
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: //QuorumCnxManager实例主要负责将信息发送给指定的server和接受来自其他server的信息 //该实例中维护了两种线程SendWorker和RecvWorker,分别用于发送数据和接受数据 qcm = new QuorumCnxManager(this); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
创建完leader的选举策略之后,便调用了
super.start()方法,所以便会执行
run()方法:
@Override public void run() { setName("QuorumPeer" + "[myid=" + getId() + "]" + cnxnFactory.getLocalAddress()); LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for(QuorumServer s: getView().values()){ ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { p = new RemotePeerBean(s); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { //主循环,主要是根据自身的身份,作相应的循环操作 while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer( logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // // Thread is used here because otherwise it would require // changes in each of election strategy classes which is // unnecessary code coupling. Thread roZkMgr = new Thread() { public void run() { try { // lower-bound grace period to 2 secs sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception",e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } else { try { setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e ); } finally { observer.shutdown(); setObserver(null); setPeerState(ServerState.LOOKING); } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); try { MBeanRegistry.getInstance().unregisterAll(); } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } jmxQuorumBean = null; jmxLocalPeerBean = null; } }
leader选举策略
leaderElection / AuthFastLeaderElection / FastLeaderElection
但是目前真正使用的是 FastLeaderElection,其他的两个都被标记为
@Deprecated
下面分析
FastLeaderElection选举策略的实现过程:
首先介绍各类之间的关系:
org.apache.zookeeper.server.quorum.QuorumCnxManager类主要负责向指定的server发送选举信息,并且接受其他server发送过来的选举信息,该类有两个内部类
SendWorker和
RecvWorker,这两个类均间接继承了
java.lang.Thread。
并且该类维护如下信息:
//记录server对应的SendWorker final ConcurrentHashMap<Long, SendWorker> senderWorkerMap; //SendWorker线程会根据自己负责的server来获取queueSendMap中指定的阻塞队列,并且判断该线程中是否 //有信息,如果有则获取循环获取其中的信息,如果没有则将lastMessageSent中保存的上一次发送给该 //server的选举信息 final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; //保存上一次发送给各个server的选举信息 final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; //接受队列,存放接收到的信息 public final ArrayBlockingQueue<Message> recvQueue;
SendWorker会从
queueSendMap和
lastMessageSent获取信息,并将选举信息发送给指定的server;
RecvWorker会接受指定的server发送的选举信息,并将信息保存到
recvQueue中。
org.apache.zookeeper.server.quorum.QuorumCnxManager.SendWorker:
@Override public void run() { threadCnt.incrementAndGet(); try { //判断是否有需要发送的信息,如果没有则发送上一次发送的选举信息 //获取指定server(sid)的消息队列 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if (bq == null || isSendQueueEmpty(bq)) { ByteBuffer b = lastMessageSent.get(sid); if (b != null) { LOG.debug("Attempting to send lastMessage to sid=" + sid); send(b); } } } catch (IOException e) { LOG.error("Failed to send last message. Shutting down thread.", e); this.finish(); } try { //循环发送队列中的选举信息,如果没有信息,则跳出循环 while (running && !shutdown && sock != null) { ByteBuffer b = null; try { //获取指定server(sid)的消息队列 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap .get(sid); if (bq != null) { //获取发送的消息 b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { LOG.error("No queue of incoming messages for " + "server " + sid); break; } if(b != null){ //放置到上次发送数据的队列中 lastMessageSent.put(sid, b); //发送数据 send(b); } } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for message on queue", e); } } } catch (Exception e) { LOG.warn("Exception when using channel: for id " + sid + " my id = " + self.getId() + " error = " + e); } this.finish(); LOG.warn("Send worker leaving thread"); }
org.apache.zookeeper.server.quorum.QuorumCnxManager.RecvWorker:
@Override public void run() { threadCnt.incrementAndGet(); try { while (running && !shutdown && sock != null) { int length = din.readInt(); if (length <= 0 || length > PACKETMAXSIZE) { throw new IOException( "Received packet with invalid packet: " + length); } //接受数据,并将数据添加到recvQueue队列中 byte[] msgArray = new byte[length]; din.readFully(msgArray, 0, length); ByteBuffer message = ByteBuffer.wrap(msgArray); addToRecvQueue(new Message(message.duplicate(), sid)); } } catch (Exception e) { LOG.warn("Connection broken for id " + sid + ", my id = " + self.getId() + ", error = " , e); } finally { LOG.warn("Interrupting SendWorker"); sw.finish(); if (sock != null) { closeSocket(sock); } } }
此外,
org.apache.zookeeper.server.quorum.FastLeaderElection中定义了内部类
org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger,该类中又定义了两个内部类:
WorkerReceiver和
WorkerSender,这两个类都间接解程了
java.lang.Thread。
并且
FastLeaderElection中定义了如下队列,真正实现leader选举信息处理逻辑是在该类的
lookForLeader()方法中:
//经过lookForLeader()方法处理之后的选举信息,ToSend是对发送的选举信息的封装 LinkedBlockingQueue<ToSend> sendqueue; //lookForLeader()方法从该队列中读取选举信息进行处理,Notification是对接受的选举信息的封装 LinkedBlockingQueue<Notification> recvqueue;
WorkerSender主要的工作是从
sendqueue队列中读取发送的选举信息,并且根据目标server的sid,将信息放置到
QuorumCnxManager中
queueSendMap的对应 key的value中;
WorkerReceiver主要的工作是从
QuorumCnxManager中
recvQueue读取接受的选举信息,放置到
recvqueue中,方便
经过lookForLeader()方法读取。
org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender:
public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } //process会调用toSend方法 void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); manager.toSend(m.sid, requestBuffer); }
该方法是
org.apache.zookeeper.server.quorum.QuorumCnxManager类中的方法:
public void toSend(Long sid, ByteBuffer b) { //如果该消息是发送给自己的,则将信息放置到recvQueue队列中 if (self.getId() == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); } else { //如果map中不包括该sid的发送队列,创建一个,并将发送信息添加到队列中 if (!queueSendMap.containsKey(sid)) { ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>( SEND_CAPACITY); queueSendMap.put(sid, bq); addToSendQueue(bq, b); } else { ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid); if(bq != null){ addToSendQueue(bq, b); } else { LOG.error("No queue for server " + sid); } } //连接该server connectOne(sid); } }
org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver:
public void run() { Message response; while (!stop) { try{ //从recvQueue中获取选举信息 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; //判断发送端server的sid,如果不在选举范围之内,返回自己当前的选举信息,主要用于区 //分observer if(!self.getVotingView().containsKey(response.sid)){ Vote current = self.getCurrentVote(); ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock, self.getPeerState(), response.sid, current.getPeerEpoch()); sendqueue.offer(notmsg); } else { if (LOG.isDebugEnabled()) { LOG.debug("Receive new notification message. My id = " + self.getId()); } if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } boolean backCompatibility = (response.buffer.capacity() == 28); response.buffer.clear(); //实例化一个Notification Notification n = new Notification(); //判断发送端server的状态 QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; switch (response.buffer.getInt()) { case 0: ackstate = QuorumPeer.ServerState.LOOKING; break; case 1: ackstate = QuorumPeer.ServerState.FOLLOWING; break; case 2: ackstate = QuorumPeer.ServerState.LEADING; break; case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; default: continue; } n.leader = response.buffer.getLong(); n.zxid = response.buffer.getLong(); n.electionEpoch = response.buffer.getLong(); n.state = ackstate; n.sid = response.sid; if(!backCompatibility){ n.peerEpoch = response.buffer.getLong(); } else { if(LOG.isInfoEnabled()){ LOG.info("Backward compatibility mode, server id=" + n.sid); } n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid); } n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0; if(LOG.isInfoEnabled()){ printNotification(n); } //如果当前的server为Looking状态,则将接受的消息假如到recvqueue中 if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ recvqueue.offer(n); //如果发送消息的server为Looking状态,并且接受的消息的选举轮数小于逻辑 //轮数(即选举消息过期),则将自己提议的选举发送回去 if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock)){ Vote v = getVote(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock, self.getPeerState(), response.sid, v.getPeerEpoch()); sendqueue.offer(notmsg); } } else { //如果当前的server的状态不是Looking,但是发送端的server为Looking状态 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id = " + self.getId() + " recipient=" + response.sid + " zxid=0x" + Long.toHexString(current.getZxid()) + " leader=" + current.getId()); } //则将选举信息封装成ToSend,返回给该服务器 ToSend notmsg; if(n.version > 0x0) { notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch()); } else { Vote bcVote = self.getBCVote(); notmsg = new ToSend( ToSend.mType.notification, bcVote.getId(), bcVote.getZxid(), bcVote.getElectionEpoch(), self.getPeerState(), response.sid, bcVote.getPeerEpoch()); } sendqueue.offer(notmsg); } } } } catch (InterruptedException e) { System.out.println("Interrupted Exception while waiting for new message" + e.toString()); } } LOG.info("WorkerReceiver is down"); }
下面分析
FastLeaderElection中
lookForLeader()的处理逻辑:
其中优先级的判断方法:
- zxid大的优先级高,zxid表示事务id,越大表示操作越新
- 如果zxid相同,则sid越大的优先级越大
public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = System.currentTimeMillis(); } try { HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ logicalclock++; updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); sendNotifications(); //保持循环,直到找到leader while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ //从接受队列中获取选举信息 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //如果选举信息为null if(n == null){ //认为可能是信息没有发送出去 if(manager.haveDelivered()){ //继续将自己提议的选举信息发送出去 sendNotifications(); //或者连接断开 } else { //重新连接 manager.connectAll(); } int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } //如果参与选举的范围内有该server else if(self.getVotingView().containsKey(n.sid)) { //判断该server的状态 switch (n.state) { case LOOKING: //如果选举轮数大于自己的逻辑轮数,说明该选举信息为最新的 if (n.electionEpoch > logicalclock) { logicalclock = n.electionEpoch; //清空自己统计的选举信息 recvset.clear(); //根据一定优先级判断,谁赢得精选 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { //并且更新提议的选举信息 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { //否则返回自己当前的提议信息 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } //并将提议选举信息返回给该server sendNotifications(); //如果接受的选信息的选举轮数,小于自己的逻辑轮数,说明信息过期,不做处理 } else if (n.electionEpoch < logicalclock) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; //如果通过优先级判断赢得选举,更新自己的提议,并且返回给该server } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } //将接受到的选举信息添加到recvset中 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //判断是否已经有server赢得leader选举(超过半数同意) if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { //等待一段时间,如果下一个信息赢得了选举,则重新将提议发送出去 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } //等待一段时间之后,没有新的选举信息到达,则确定最终leader if (n == null) { //判断自己是否赢得leader选举,没有则将自己设置为following //赢得,则将自己设置为leading self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; //observer不参与投票,所以直接忽略 case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; //follower和leader的操作方相同 case FOLLOWING: case LEADING: //如果投票没有过期,直接判断自己是否是投票中的leader,如果是,设置自己为 //leader,因为发送端server状态不是looking说明已经投票结束 if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; } }
watcher原理
watch机制的大致流程:
- 客户端创建watcher实例并向zookeeper服务器端进行注册
- 向服务器端注册成功后,将watcher保存到本地的WatchManager中
- 当注册的监听事件触发后,zookeeper服务器端会通知客户端
- 客户端接到通知后从WatchManager获取监听该事件的watcher
- 执行这些watcher的process方法
zookeeper中可以进行watcher注册的方法有
exists(),
getData(),
getChildren(),这里我们以
getData()方法为例进行分析:
//getData()方法提供了两种实现方式,一种是传入具体的watcher实例,另一种是传入boolean变量 //当传入true时,采用默认 watcher进行注册,传入false时,不进行注册 //其中,默认watcher是指在创建zookeeper实例时传入的watcher public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException { return getData(path, watch ? watchManager.defaultWatcher : null, stat); } public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String clientPath = path; PathUtils.validatePath(clientPath); WatchRegistration wcb = null; if (watcher != null) { //此处创建的实例对象会根据调用的方法的不同而不同 //getData():DataWatchRegistration //exists():ExistsWatchRegistration //getChildren():ChildWatchRegistration //这些类都继承了WatchRegistration类 wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); //对请求信息进行封装 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null) 23ff7 ; GetDataResponse response = new GetDataResponse(); //使用ClientCnxn实例将请求内容发送到服务端,这里的请求内容就是watcher的注册信息 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
下面来看一下ZooKeeper的构造方法:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher + " sessionId=" + Long.toHexString(sessionId) + " sessionPasswd=" + (sessionPasswd == null ? "<null>" : "<hidden>")); //如上面所说的,传入的watcher设置为默认的watcher watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); //创建cnxn实例 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly); cnxn.seenRwServerBefore = true; //启动cnxn实例 cnxn.start(); }
因为
ClientCnxn类中有两个线程,分别为
EventThread和
SendThread,
start()方法就是启动这两个线程,这两个线程后面再介绍。
public void start() { sendThread.start(); eventThread.start(); }
在
getData()方法中 ,调用了
queuePacket()方法:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; //将发送的信息封装成packet,packet是zookeeper中发送数据的最小数据单元 synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } //将packet添加到发送队列中 outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
下面便是sendThread线程发送注册信息的过程:
@Override public void run() { //clientCnxnSocket的具体实现类型为ClientCnxnSocketNIO,其采用NIO的方式进行通信 clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = System.currentTimeMillis(); final int MAX_SEND_PING_INTERVAL = 10000; while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } //防止已经关闭,线程又重新启动 if (closing || !state.isAlive()) { break; } //开始连接 startConnect(); clientCnxnSocket.updateLastSendAndHeard(); } //如果状态是已连接状态 if (state.isConnected()) { //判断是否需要发送认证失败事件 if (zooKeeperSaslClient != null) { ...... } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { ...... } //如果需要发送认证失败事件 if (sendAuthEvent == true) { //将该事件交由eventThread线程来处理 eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { ...... } if (state.isConnected()) { ...... } if (state == States.CONNECTEDREADONLY) { ...... } //经过各种判断之后,最终会调用doTransport方法来对发送的数据和接受的数据进行处理 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { ...... } } cleanup(); clientCnxnSocket.close(); if (state.isAlive()) { ...... } ...... }
在
doTransport()方法中,主要是使用NIO的方式:
@Override void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { //调用doIO方法对信息进行处理 doIO(pendingQueue, outgoingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { synchronized(outgoingQueue) { if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { enableWrite(); } } } selected.clear(); }
在该方法中,最中会调用
org.apache.zookeeper.ClientCnxn.SendThread#readResponse()方法对接受的信息进行处理。
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } //如果sockkey为刻度状态 if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { //调用readResponse方法对接受的信息进行处理 sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } //如果sockkey可写 if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } //将packet写入到sock sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { disableWrite(); } else { enableWrite(); } } } }
该方法是
ClientCnxn的内部类
SendThread类中的方法,该方法主要是针对接受的消息中
replyHdr标识的不同而对不同的数据进行不同的处理。
void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { //replyHdr中Xid为-2,表示ping,不做处理 if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { //replyHdr中Xid为-4,表示认证信息 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { //replyHdr中Xid为-1,表示表示事件通知 if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } //创建事件 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } //通过eventThread来将事件添加到队列中 eventThread.queueEvent( we ); return; } if (clientTunneledAuthenticationInProgress()) { GetSASLRequest request = new GetSASLRequest(); request.deserialize(bbia,"token"); zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this); return; } //如果不为以上情况 Packet packet; synchronized (pendingQueue) { if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } //从pendingQueue获取packet packet = pendingQueue.remove(); } //判断,如果请求头中的Xid不等于响应头中Xid,则表示出现错误 try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } //设置packet相关信息 packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { //通过该方法对packet进行处理 finishPacket(packet); } }
finishPacket()方法是
ClientCnxn类中的方法:
private void finishPacket(Packet p) { //如果packet中的watcher的注册信息不为空,则进行注册,会保存到一个map中 if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p); } }
下面来看eventThread线程的工作过程,该线程主要是负责对事件进行处理,相较于sendThread要简单的多:
@Override public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); //如果获取的事件为kill事件,则kill掉该线程 if (event == eventOfDeath) { wasKilled = true; } else { //调用该方法处理event processEvent(event); } if (wasKilled) //如果kill线程,跳出主循环 synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); }
处理事件方法:
rivate void processEvent(Object event) { try { //主要分为两类,一类是处理event,event会在queueEvent方法中封装成WatcherSetEventPair if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; //调用所有监听该事件的watcher的process方法 for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } //第二类,根据packet中数据的类型,进行不同的处理 } else { Packet p = (Packet) event; int rc = 0; String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { ...... } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { ...... } else if (p.response instanceof GetDataResponse) { ...... } else if (p.response instanceof GetACLResponse) { ...... } else if (p.response instanceof GetChildrenResponse) { ...... } else if (p.response instanceof GetChildren2Response) { ...... } else if (p.response instanceof CreateResponse) { ...... } else if (p.response instanceof MultiResponse) { ...... } else if (p.cb instanceof VoidCallback) { ...... } } } catch (Throwable t) { ...... } } }
queueEvent()和
queuepacket()方法:
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); //将event进行封装 WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); //添加到waitingEvents waitingEvents.add(pair); } public void queuePacket(Packet packet) { if (wasKilled) { synchronized (waitingEvents) { if (isRunning) waitingEvents.add(packet); else processEvent(packet); } } else { //将packet添加到waitingEvents waitingEvents.add(packet); } }
上述是客户端中watch机制的处理方式,服务器端的代码没有列出。
相关文章推荐
- Eureka学习笔记001--Eureka概念和源码分析
- OpenCV学习笔记(28)KAZE 算法原理与源码分析(二)非线性尺度空间构建
- Openstack学习笔记之——Neutron-server服务加载与启动源码分析(三)
- jQuery源码研究分析学习笔记-jQuery.fn.init()(五)
- TQ2440 学习笔记—— 31、移植U-Boot【U-Boot 的启动过程第二阶段源码分析】
- netty5源码分析(2)--学习笔记
- 学习笔记之ConcurrentLinkedQueue源码分析整理
- tomcat源码分析学习笔记(五)
- Cocos2d-x学习笔记(19)(TestCpp源码分析-3)
- memcached学习笔记——存储命令源码分析下篇
- TQ2440 学习笔记—— 30、移植U-Boot【U-Boot 的启动过程第一阶段源码分析】
- TensorFlow学习笔记之源码分析(3)---- retrain.py
- Vue.js 源码学习笔记 -- 分析前准备1 -- vue三大利器
- mangos0.9源码分析学习笔记(二)
- redis源码分析(八)、redis数据结构之压缩ziplist--------ziplist.c ziplist.h学习笔记
- bootstrap-modal 学习笔记 源码分析
- Hadoop学习笔记-WordCount源码分析
- JFinal个人学习笔记之源码分析2
- OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波