Zookeeper源码解析——服务端启动流程
2017-07-15 19:36
465 查看
一 启动类
从bin/zkServer.cmd可以看出启动类是org.apache.zookeeper.server.quorum.QuorumPeerMain,类结构如下:一个普通的类,主要有个QuorumPeer(zookeeper集群版启动时节点用QuorumPeer表示)的变量以及启动main函数和两个初始化函数
二 启动流程
QuorumPeerMain.initializeAndRun(args):解析配置,启动数据自动清除的定时任务,集群版则启动集群版代码。1 解析配置
QuorumPeerConfig.parsepublic void parse(String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { //构造器模式 File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); } parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } //支持动态配置 if (dynamicConfigFileStr!=null) { try { Properties dynamicCfg = new Properties(); FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr) ... setupQuorumPeerConfig(dynamicCfg, false); ... }
2 数据自动清除任务
DatadirCleanupManager,包含一个Timer定时器和PurgeTask清理任务。首先认知下zookeeper主要存放了两类文件,snapshot和log,snapshot是数据的快照,log是与snapshot关联一致的事务日志
3 集群版启动代码
这里开始构建QuorumPeer实例,根据配置进行set,并start3.1 QuorumPeer几个重要的配置属性:
ServerCnxnFactory cnxnFactory:默认实现是NIOServerCnxnFactory,主要是负责和客户端建立连接和通信FileTxnSnapLog logFactory:用户日志记录和snapshot存储,能根据它加载出数据到ZKDatabase中,同时能将ZKDatabase中的数据以及session保存到快照日志文件中
int electionType:选举算法的类型。默认是3,采用的是FastLeaderElection选举算法,
long myid:就是myid文件中写入的数字,节点标识。
int tickTime:session检查的心跳间隔
minSessionTimeout、maxSessionTimeout:限制客户端给出的sessionTimeout时间
initLimit:初始化阶段,和leader通信超时、接受其他过半节点响应的超时设置,超时时间是initLimit*tickTime;
syncLimit:初始化阶段后,代替initLimit作用,作为后续连接的超时设置,时间也是syncLimit*tickTime
ZKDatabase zkDb:存储ZooKeeper树形数据,是服务器的内存数据库
quorumConfig:用于验证是节点是否已经认同了。默认采用的是QuorumMaj,即最简单的数量过半即可,不考虑权重问题
learnerType:两种,PARTICIPANT, OBSERVER。PARTICIPANT参与投票,可能成为Follower,也可能成为Leader。OBSERVER不参与投票,角色不会改变。
quorumPeers: QuorumServer包含ip、和Leader用的通信端口、选举投票用的端口
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } quorumPeer = getQuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
3.2 quorumPeer.start()
QuorumPeer继承自Thread,start流程如下:Created with Raphaël 2.1.0开始加载数据库启动服务,监听客户端请求启动Jetty管理服务,提供管理页面开始选举Run结束
3.2.1 加载数据库
ZKDatabase: 类注释说明这是zk的内存数据库,包含了session、datatree和commited log 信息protected DataTree dataTree; protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;//session的id以及对应的超时时间,session id可关联log protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
DataTree:
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();//维护path对应的node private final WatchManager dataWatches = new WatchManager();//数据更新的触发管理 private final WatchManager childWatches = new WatchManager(); //自节点更新的触发管理 private final PathTrie pTrie = new PathTrie();//跟踪配额节点的信息 private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();//维护session的临时节点
DataNode包含有children,data[]存储本节点的数据,stat 是否持久化的状态信息
TODO WatchManager解析
QuorumPeer.loadDataBase,通过zkDb.loadDataBase,从最新的 snap中恢复zkDb,然后从最新的log,将未持久化的信息replay。
zkDb.loadDataBase之后,可以获得最新的zkid,进而确定选举进入了哪一个代(epoch)
3.2.3 启动客户端监听服务
默认实现:NIOServerCnxnFactory.start()public void start() { stopped = false; if (workerPool == null) { workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } for(SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
看下类注释:
/** * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using * NIO non-blocking socket calls. Communication between threads is handled via * queues. * * - 1 accept thread, which accepts new connections and assigns to a * selector thread * - 1-N selector threads, each of which selects on 1/N of the connections. * The reason the factory supports more than one selector thread is that * with large numbers of connections, select() itself can become a * performance bottleneck. * - 0-M socket I/O worker threads, which perform basic socket reads and * writes. If configured with 0 worker threads, the selector threads * do the socket I/O directly. * - 1 connection expiration thread, which closes idle connections; this is * necessary to expire connections on which no session is established. * * Typical (default) thread counts are: on a 32 core machine, 1 accept thread, * 1 connection expiration thread, 4 selector threads, and 64 worker threads. */
一个初始化的工作线程池workerPool, 包含
ArrayList<ExecutorService> workers,每个worker都是一个容量为1的固定线程池
workers.add(Executors.newFixedThreadPool(1, new DaemonThreadFactory(threadNamePrefix, i)))
有多个Selector线程,每个Selector线程只负责select各自负责的连接,避免连接数太大,使得selector变成性能瓶颈
一个Accept线程,只处理新来的连接,将连接交给Selector
一个Expire线程,检查并关闭空闲的连接
start()将启动上面所有线程
先看下Accept线程:
初始化是在NIOServerCnxnFactory.configure()。ServerSocketChannel被打开,并绑定port,和selectorThreads 一起传递给了acceptThread的构造函数。acceptThread在构造时,就关注了serverSocket的OP_ACCEPT事件。并在run循环中,监听新连接的事件到来,最后在doAccept()中将连接的socket传给其中一个selectorThread(调用selectorThread.addAcceptedConnection)。
public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException { ... this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); } public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList( new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); }
private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { sc = acceptSocket.accept(); ... SelectorThread selectorThread = selectorIterator.next(); if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException( "Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } } catch (IOException e) { ... } return accepted; }
再看下 selectorThread线程。
先看下run循环
public void run() { try { while (!stopped) { try { //接收读写的事件,并兼读写操作交给workerThread进行 select(); //处理新的连接,关注读,并添加NIOServerCnxn processAcceptedConnections(); //select之后会暂时不再关注对应socket的读写事件 //这里来恢复关注。 processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } ... //退出循环后对连接进行关闭清理 } private void select() { ... selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected); Collections.shuffle(selectedList); Iterator<SelectionKey> selectedKeys = selectedList.iterator(); while(!stopped && selectedKeys.hasNext()) { ... if (key.isReadable() || key.isWritable()) { handleIO(key); } } ... } private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); }
selectorThread的addAcceptedConnection会将新来的连接socket入队,通过wakeupSelector()结束run中的select()阻塞,使其能马上processAcceptedConnections。
wokerPool中的线程对socket的数据处理,最终由ZookeeperServer.processPacket完成。这里数据的处理参考 数据序列化一节
3.3.4 启动管理服务
默认是Jetty提供服务,会占用8080端口,在做单机伪集群部署时,要禁用Jetty3.3.5 选举
会指定一个选举算法createElectionAlgorithm(3),默认是3,快速选举,其他算法准备过期了。FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start();
FastLeaderElection需要一个QuorumPeer节点对象,QuorumCnxManager节点连接管理器(管理与其他节点的连接),sendqueue 和recvqueue 两个队列的成员变量,一个Messenger,负责消息的发送和接收
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false; this.manager = manager; starter(self, manager); } private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); } Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); }
FastLeaderElection.start(),直接调用mesenger.start(),启动消息的发送和接收
消息发送由WorkerSender实现
class WorkerSender extends ZooKeeperThread { volatile boolean stop; QuorumCnxManager manager; WorkerSender(QuorumCnxManager manager){ super("WorkerSender"); this.stop = false; this.manager = manager; } public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } /** * Called by run() once there is a new message to send. * * @param m message to send */ void process(ToSend m) { ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); manager.toSend(m.sid, requestBuffer); } }
消息接收由WorkerReceiver实现
Message response; while (!stop) { // Sleeps on receive response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); //解包 ... Notification n = new Notification(); //继续解包,版本校验 //如果消息来自不参与选举的服务器,则直接返回自己的情况 if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) { Vote current = self.getCurrentVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } else { /* * If this server is looking, then send proposed leader */ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){ Vote v = getVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } } else { /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}", self.getId(), response.sid, Long.toHexString(current.getZxid()), current.getId(), Long.toHexString(self.getQuorumVerifier().getVersion())); } QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); sendqueue.offer(notmsg); } }
总结下这段接收的流程:
Created with Raphaël 2.1.0消息解码对方在参与选举?我在参与选举?消息入列,如果对方选举轮数靠后,直接返回当前的选举结果结束如果对方在参与选举,则告诉他当前的选举结果返回当前选举结果yesnoyesno
3.2.6 Run循环
while(running)中根据zk状态,作不同处理LOOKING:选举阶段,不断参与选举(lookForLeader()),直至选举结束(届时完成状态切换)
OBSERVING:观察阶段,尝试与Leader通信,获取自己的状态信息
LEADING:Leader,构造Leader服务,并调用lead()后阻塞,只有Leader挂了后,再启动则回到LOOKING状态
FOLLOWING:Follower, 构造Follower服务,并调用followLeader()后阻塞,只有挂了,重新回到LOOKING状态
相关文章推荐
- Android Activity启动流程源码全解析(1)
- MTK Kernel启动流程源码解析 5 start_kernel 下
- Kernel启动流程源码解析 6 setup_arch
- Android ContentProvider启动流程源码解析(8.0)
- zookeeper源码分析之一服务端启动过程
- Tomcat源码解析(五):Connector连接器的初始化和启动流程
- Android Zygote启动流程源码解析
- Android Activity启动流程源码解析
- Android Activity启动流程源码全解析(2)
- Zookeeper源码解析之Server启动
- Android源码基础解析之系统启动并解析Manifest的流程
- 【Hadoop】Flume-ng源码解析之启动流程
- netty源码分析之服务端启动全解析
- MTK Kernel启动流程源码解析 1 head.S
- Hadoop源码解析之YARN服务端作业提交流程
- Gaea源码阅读(三):服务端启动流程
- Spring IOC容器启动流程源码解析(一)——容器概念详解及源码初探
- Kernel启动流程源码解析 3 init_task
- netty源码分析之服务端启动全解析
- android源码解析之(八)-->Zygote进程启动流程