zookeeper ZAB协议 Follower和leader源码分析
2017-07-10 20:05
525 查看
Follower处理逻辑
void followLeader() throws InterruptedException { //... try { //获取leader server QuorumServer leaderServer = findLeader(); try { //主动向leader发起连接,TCP连接 connectToLeader(leaderServer.addr, leaderServer.hostname); //发送follower的,包括last zxid sid,并从leader读取最新的zxid,再把last zxid发送给leader。返回leader zxid //1. 首先follower发送自己的last zxid和sid,目的是为了leader确认epoch。FOLLOWERINFO //2. leader返回确认后的epoch。LEADERINFO //3. follower再次发送自己的最新zxid。ACKEPOCH //4. 返回new epoch long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); //check to see if the leader zxid is lower than ours //this should never happen but is just a safety check //(注释有点问题,是判断的epoch而不是zxid) long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid); if (newEpoch < self.getAcceptedEpoch()) { LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid) + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch())); throw new IOException("Error: Epoch of leader is lower"); } //和leader开始同步,首先收到一条消息,判断DIFF,TRUNC,SNAP // syncWithLeader(newEpochZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp); processPacket(qp); } } }
Leader处理逻辑
void lead() throws IOException, InterruptedException { try { self.tick = 0; //初始化,清理旧的session和创建状态机树 zk.loadData(); leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid()); // 建立监听,同时处理和follower的发现,同步阶段逻辑 cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); try { //等待NEWLEADER_ACK,说明已经同步完成 waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); } //开始服务,先发送UPTODATA, startZkServer(); }
发现阶段逻辑处理
建立LearnerCnxAcceptor监听后,会启动LearnerHandler线程
public void run() { try { //读取FOLLOWERINFO,里面包含了follower的sid和peerLastZxid QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); byte learnerInfoData[] = qp.getData(); long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); peerLastZxid = ss.getLastZxid(); /* the default to send to the follower */ //默认为全量同步 int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid; ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); //读取缓存队列中最小的zxid,所有需要同步的最小值 final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); //读取缓存队列中最大的zxid,所有需要同步的最大值 final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); //获取当前leader的所有日志CommittedLog //根据上面可知,只会同步到和leader最后一个已提交日志 //不需要同步,发送一个空的DIFF if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { // Follower is already sync with us, send empty diff LOG.info("leader and follower are in sync, zxid=0x{}", Long.toHexString(peerLastZxid)); packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else if (proposals.size() != 0) { //minCommittedLog <= peerLastZxid <=maxCommittedLog,进行DIFF同步 if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { //这里有一种特殊情况,需要先TRUNK,再DIFF同步 //leader的日志是50001,50002,60001,60002 //follower的日志是50003 //把需要同步的数据加入发送队列 } else if (peerLastZxid > maxCommittedLog) { //大于maxCommittedLog,直接TRUCK packetToSend = Leader.TRUNC; } else { LOG.warn("Unhandled proposal scenario"); } } leaderLastZxid = leader.startForwarding(this, updates); } QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null); //NEWLEADER报文加入发送队列,这时还没有发送任何报文 queuedPackets.add(newLeaderQP); bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } //发送SNAP,DIFF,或者TRUNK oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /*如果不是DIFF和TRUNK,直接发送全量信息 */ if (packetToSend == Leader.SNAP) { leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); // 开始发包 new Thread() { public void run() { Thread.currentThread().setName( "Sender-" + sock.getRemoteSocketAddress()); try { //发送同步报文 sendPackets(); } catch (InterruptedException e) { LOG.warn("Unexpected interruption",e); } } }.start(); //等待NEWLEADER_ACK,等到了NEWLEADER_ACK说明已经同步完成 leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); //等待大多数同步完成,leader starts up synchronized(leader.zk){ while(!leader.zk.isRunning() && !this.isInterrupted()){ leader.zk.wait(20); } //发送UPTODATE报文,learn开始服务 queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); //正常处理流程 while (true) { switch (qp.getType()) { //处理propose,commit case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { if (LOG.isDebugEnabled()) { LOG.debug("Received ACK from Observer " + this.sid); } } syncLimitCheck.updateAck(qp.getZxid()); leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; //和follower保持session信息 case Leader.PING: // Process the touches ByteArrayInputStream bis = new ByteArrayInputStream(qp .getData()); DataInputStream dis = new DataInputStream(bis); while (dis.available() > 0) { long sess = dis.readLong(); int to = dis.readInt(); leader.zk.touch(sess, to); } break; case Leader.REVALIDATE: //延长session时间 case Leader.REQUEST: //加入处理队列 default: } } } }
总结
源码差不多看完了,整体挺复杂的,这里总结一下发现和同步的过程。
- newEpoch:提供服务的epoch
- acceptedEpoch:没有确认的epoch,LEADERINFO阶段
- currentEpoch:确认的epoch,接收到UPTODATE后
- lastLoggedZxid:最后处理的日志(包括提交,未提交)
相关文章推荐
- Zookeeper源码分析之六 Leader/Follower初始化
- 第二人生的源码分析(二十六)底层网络协议
- 第二人生的源码分析(六十)多协议文件传送库libcurl的介绍
- [FreeModbus源码分析] 1.协议简介
- Zookeeper源码分析(3)- Leader执行流程
- Glusterfs之nfs模块源码分析(上)之nfs原理和协议
- UDT4协议源码分析之数据的发送和接收
- SCTP协议源码分析--拥塞控制算法
- Glusterfs之nfs模块源码分析(上)之nfs原理和协议
- Zookeeper源码分析(11)- FollowerRequestProcessor
- 第二人生的源码分析(六十二)类Easy实现多协议文件传送
- 蔡军生先生第二人生的源码分析(六十二)类Easy实现多协议文件传送
- 蔡军生先生第二人生的源码分析(六十)多协议文件传送库libcurl的介绍
- hadoop源码分析系列(四)——org.apache.hadoop.hdfs包之协议篇
- 第二人生的源码分析(六十)多协议文件传送库libcurl的介绍
- 蔡军生先生第二人生的源码分析(二十六)底层网络协议
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- Tor源码分析七 -- 握手协议
- SCTP协议源码分析--多归属特性multi-homed(二)
- 第二人生的源码分析(二十六)底层网络协议