zookeeper leader选举算法源码
2017-07-10 20:04
525 查看
服务器状态
在QuorumPeer中有定义,这个类是一个线程。
- LOOKING:寻找Leader状态。处于该状态时,它会认为当前集群中没有Leader,进入选举流程。
- FOLLOWING:
- LEADING
- OBSERVING
选票数据结构
public class Vote { // final private int version; //被选举leader的服务器ID final private long id; //被选举leader的事务ID final private long zxid; //逻辑时钟,判断多个选票是否处于同一个选举周期, final private long electionEpoch; //被推举leader的选举轮次 final private long peerEpoch; //状态 final private ServerState state;
QuorumCnxManager:网络IO
负责选举leader时的网络通信
消息队列
SendWork和RevWork都是一个线程
/* * 分别是发送器,发送队列,最后发送的消息。每个连接都有 */ final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;//SendWork里面有RevWork对象 final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; /* * 接受队列只有一个 */ public final ArrayBlockingQueue<Message> recvQueue;
建立连接
zookeeper为Leader选举会建立一条连接,默认端口是3888。为了防止两台服务器有重复链接,zookeeper定义了规则,只能sid大的去连接sid小的。如果sid小的连接了sid大的,在连接处理程序中会断掉这条连接,然后重新发起连接。
main->receiveConnection->handleConnection(创建sendwork和revwork,并且加入队列集)
消息的接收和发送
消息的接收过程是由消息接收器recvwork负责,它源源不断从TCP读取数据,加入recvQueue(唯一)。
- 消息发送器主要有两条逻辑 启动sendWork线程后如果发现发送队列是null,从lastMessageSent获取这条数据重新发送。(为了解决由于收到消息前后服务器挂掉,导致消息未正确处理)
- sendWork从队列queueSendMap里面获取数据,通过调用队列的poll函数从队列获取数据
FastLeaderElection
这是选举选法的核心部分,主要在FastLeaderElection中
选票管理
public class FastLeaderElection implements Election{ //发送队列,用于保存待发送的选票 LinkedBlockingQueue<ToSend> sendqueue; //接收队列,用于保存接收的外部选票 LinkedBlockingQueue<Notification> recvqueue; //选票发送器和接收器线程 Messenger messenger; protected class Messenger { //选票接收器线程,接受选票,如果当前状态不为locking,将leader信息发回 class WorkerReceiver extends ZooKeeperThread{} //选票发送器线程,发送选票。 //负责把选票转化为消息,放入QuorumCnxManager的发送队列, //如果是投给自己的,直接放入接收队列 class WorkerSender extends ZooKeeperThread {} } }
核心算法——lookForLeader
- 调用流程:QuorumPeer->locking状态(可以启动只读模式和阻塞模式)->lookForLeader
public Vote lookForLeader() throws InterruptedException { //... try { //用于选票归档 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ //自增logicalclock, logicalclock++; //初始化选票,投给自己,使用lastProcessedZxid(最后已提交的日志投票) updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch()); } //初始化选票,然后WorkerSender发送 sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); //没有获得外部选票 if(n == null){ //如果连接仍然保持,重新发送投票 if(manager.haveDelivered()){ sendNotifications(); } else { //连接失效,重新建立连接。开始的时候是这样建立连接的? manager.connectAll(); } //修改超时参数... } //处理选票 else if(self.getVotingView().containsKey(n.sid)) { switch (n.state) { case LOOKING: // 大于当前选举轮次 if (n.electionEpoch > logicalclock) { logicalclock = n.electionEpoch; //清空接受的选票 recvset.clear(); //选票PK,外部更新。有3条规则 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { //变更选票 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { //不变更选票 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } // 小于当前选举轮次,直接丢弃 else if (n.electionEpoch < logicalclock) { break; } //等于当前选举轮次,直接PK else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } //无论是否重新投票,都要选票归档,<sid, 选票> //都是和自己的提议对比 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //统计投票,决定是否终止投票 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // 判断leader是否改变 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } if (n == null) { //设置状态,如果leader是自己,状态为Leading //如果leader是其他节点,状态可能为observing或者following self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); //清空接收队列 leaveInstance(endVote); return endVote; } } break; case OBSERVING: break; //已经选出结果 case FOLLOWING: case LEADING: //除了做出过半判断,同时还要检查leader是否给自己发送过投票信息,从投票信息中确认该leader是不是LEADING状态(防止出现时间差)。 /* 同一轮投票选出leader,那么判断是不是半数以上的服务器都选举同一个leader,如果是设置角色并退出选举 */ if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* 非同一轮次,例如宕机很久的机器重新启动/某个节点延迟很大变为locking,需要收集过半选票。*/ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } }
- 初始选票 (sid, LastLoggedZxid, currentEpoch)
- LastLoggedZxid为处理(包括提交,未提交)
-
选票状态
-
New epoch更高
模块图总结
相关文章推荐
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- zookeeper FastLeader选举算法
- 图解zookeeper FastLeader选举算法
- Zookeeper系列(三十三)Zookeeper之leader选举算法补充
- Zookeeper中的FastLeaderElection选举算法简述
- 图解zookeeper FastLeader选举算法
- 【Zookeeper】源码分析之Leader选举(二)
- zookeeper Leader选举算法分析
- 【Zookeeper】源码分析之Leader选举(一)
- zookeeper leader选举源码实现
- ZooKeeper系统模型之Leader选举算法分析。
- zookeeper选举算法之FastLeaderElection
- 自己动手实现zookeeper的FastLeaderELection选举算法和心跳同步
- Zookeeper选举算法( FastLeader选主)
- Zookeeper fast leader选举算法
- ZooKeeper源码解析(六):ZooKeeper的三种选举算法
- 【Zookeeper】源码分析之Leader选举(二)