Zookeeper-会话创建流程
2016-07-15 22:40
363 查看
初始化阶段:
初始化Zookeeper对象。调用Zookeeper的构造方法来实例化一个Zookeeper,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager。设置会话默认Watcher。如果在构造方法中传入一个Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager。
构造Zookeeper服务器地址列表管理器:HostProvider。在构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。
创建并初始化客户端网络连接器:ClientCnxn。Zookeeper客户端首先会创建一个网络连接器ClientCnxn。用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务器端响应的等待队列。
初始化SendThread和EventThread。客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情。
//初始化ZooKeeper
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Watcher ()); public ZooKeeper(String connectString, intsessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); //设置默认的watcher watchManager.defaultWatcher = watcher; //分析传入的IP地址,并存放ServerAddress ConnectStringParser connectStringParser = new ConnectStringParser( connectString); //构造Zookeeper服务地址列表管理器、底层是随机分配一个ZK服务地址。基于环形地址列表队列(可以去研究) HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); //创建并初始化客户端网络连接器:ClientCnxn; cnxn = newClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; //初始化SendThread和EventThread sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); }
会话创建阶段
启动SendThread和EventThread。SendThread首先会判断当前客户端的状态,进行一系列请理性工作,为客户端发送“会话创建”请求做准备。获取一个服务器地址。在开始创建TCP之前,SendThread首先需要获取一个Zookeeper服务器的目标地址,
这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与Zookeeper服务器之间的TCP连接。
创建TCP连接。获取一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:
loop:
- try:
- - !isConnected()
- - - connect()
- - doTransport()
- catch:
- - cleanup()
close()
构造ConnectRequest请求。
在TCP连接创建完毕后,可能有的读者会认为,这样是否就说明已经和Zookeeper服务器完成连接了呢?其实不然,上面的步骤只是纯粹地从网络TCP层完成了客户端与服务端之间的Socket连接,但远未完成Zookeeper客户端的会话创建。
SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试图与服务端创建一个会话。同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue中去。
发送请求。当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。
clientCnxnSocket.doTransport:
@Override voiddoTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); //构造出一个ConnectRequest请求,该请求代表了客户端试图与服务端创建一个会话。同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue中去。 sendThread.primeConnection(); } } elseif ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, outgoingQueue, cnxn); //处理读写操作。 } } if (sendThread.getZkState().isConnected()) { synchronized(outgoingQueue) { if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { enableWrite(); } } } selected.clear(); }
响应处理阶段
接受服务器端响应。ClientCnxnSocket接受到服务端响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。处理Response。ClientCnxnSocket会对接受到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到Zookeeper服务端分配的会话SessionId。[SessionID怎么分配的了?]
连接成功。连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态,另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。
生成时间:SyncConnected-None。为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。
查询Watcher。EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出存储的默认Watcher,然后将其放到EventThread的watingEvents队列中去。
处理事件。EventThread不断的从watingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。
ClientCnxnSocketNIO:
/** * @return true if a packet was received * @throws InterruptedException * @throws IOException */ voiddoIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { thrownew IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { thrownew EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } elseif (!initialized) { readConnectResult(); //处理Response enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } elseif (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } } } }
doIO:
响应接收:
1、如果检测到当前客户端还尚未初始化,那么说明当前客户端与服务端之间正在进行会话创建,那么直接将接收到的ByteBuffer(imcomingBuffer)序列户成ConnectResponse对象。2、如果当前客户端已经处于正常的会话周期,并且接收的服务响应的是一个事件,那么Zookeeper客户端会将接收到的ByteBuffer(imcomingBuffer)序列化成WatcherEvent对象,并将该对象放入待处理队列中,。
3.如果是一个常规的请求响应(指定是Create、GetData和Exist等操作请求),那么会从PendingQuene队列中取出一个Packet来进行相应的处理。Zookeeper客户端首先会检查服务端响应中包含的XID值来确保请求处理的顺序性,然后将接收到的ByteBuffer(imcomingBuffer)序列化成相应的Response对象。
最后会在finshPacket方法处理Watcher注册逻辑。
注:outgoingQueue和pendingQuene
ClientCnxn,两个比较核心的队列outgoingQueue和pendingQuene,outgoingQueue队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet对象集合。pendingQuene队列是为了存储那些已经从客户端发送到服务器的,但是需要等待服务器响应的Packet集合。
在正常的情况下(即客户端与服务端之间的TCP链接正常且会话有效的情况下),会从outgoingQueue队列取出Packet一个可发送的对象,同时生成客户端请求序号XID将其设置到Packet请求头中,然后将其序列化后进行发送。“可发送的Packet对象”特指是如果检测到客户端与服务端之间正在处理SASL权限的话,那么那些不含请求头的Packet(会话创建请求)是可以发送的,其余的无法发送。请求完毕后,会立即将该Packet保存到pendingQuene队列中,以便等地啊服务端响应后进行相应的处理。
SendThred:是客户端ClientCnxn内部核心的IO调度线程,用于管理客户端和服务端之间所有网络IO 操作,在Zookeeper客户端的实际运行过程中,一方面,SendThred维护客户端与服务端之间的会话周期,器通过一定的周期频率内向服务器发送一个PING包来实现心跳机制,同时在会话周期内,如果客户端与服务端之间出现TCP连接断开情况,那么会自动且透明化的完成重连操作,,另一方面SendThred管理客户端所有请求的发送和响应接收,其将上层客户端API的操作转换成相应的请求协议发送到服务端,并完成对同步调用的返回和异步调用的回调。同时SendThred还负责将来自服务端的事件传递给EventThred处理.
EventThred:是客户端ClientCnxn另外一个核心线程,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThred中有一个waitEvents队列,用于临时存放那些需要被触发的Object。包括那么客户端注册的Wacter和异步接口注册的回调器AsyncCallBack,同时EventThred会不断从waitEvents这个队列取出Object,识别出其具体类型,并分别调用process和prodessResult接口方法来事件对事件的触发和回调。
SessionTrackerImpl: SessionTracker是Zookeeper服务端的会话管理器,负责会话创建、管理、清理工作
1、会话概述
在ZooKeeper中,客户端和服务端建立连接后,会话随之建立,生成一个全局唯一的会话ID(Session
ID)。服务器和客户端之间维持的是一个长连接,在SESSION_TIMEOUT时间内,服务器会确定客户端是否正常连接(客户端会定时向服务器发送heart_beat,服务器重置下次SESSION_TIMEOUT时间)。因此,在正常情况下,Session一直有效,并且ZK集群所有机器上都保存这个Session信息。在出现网络或其它问题情况下(例如客户端所连接的那台ZK机器挂了,或是其它原因的网络闪断),客户端与当前连接的那台服务器之间连接断了,这个时候客户端会主动在地址列表(实例化ZK对象的时候传入构造方法的那个参数connectString)中选择新的地址进行连接。
2、连接断开
好了,上面基本就是服务器与客户端之间维持会话的过程了。在这个过程中,用户可能会看到两类异常CONNECTIONLOSS(连接断开)和SESSIONEXPIRED(Session过期)。连接断开(CONNECTIONLOSS)一般发生在网络的闪断或是客户端所连接的服务器挂机的时候,这种情况下,ZooKeeper客户端自己会首先感知到这个异常,具体逻辑是在如下方法中触发的:一种场景是Server服务器挂了,这个时候,ZK客户端首选会捕获异常
所以,现在对于“连接断开”这个过程就一目了然了,核心流程如下: ZK客户端捕获“连接断开”异常 ——> 获取一个新的ZK地址 ——> 尝试连接
在这个流程中,我们可以发现,整个过程不需要开发者额外的程序介入,都是ZK客户端自己会进行的,并且,使用的会话ID都是同一个,所以结论就是:发生CONNECTIONLOSS的情况,应用不需要做什么事情,等待ZK客户端建立新的连接即可。
3、会话超时
SESSIONEXPIRED发生在上面蓝色文字部分,这个通常是ZK客户端与服务器的连接断了,试图连接上新的ZK机器,但是这个过程如果耗时过长,超过了SESSION_TIMEOUT 后还没有成功连接上服务器,那么服务器认为这个Session已经结束了(服务器无法确认是因为其它异常原因还是客户端主动结束会话),由于在ZK中,很多数据和状态都是和会话绑定的,一旦会话失效,那么ZK就开始清除和这个会话有关的信息,包括这个会话创建的临时节点和注册的所有Watcher。在这之后,由于网络恢复后,客户端可能会重新连接上服务器,但是很不幸,服务器会告诉客户端一个异常:SESSIONEXPIRED(会话过期)。此时客户端的状态变成CLOSED状态,应用要做的事情就是的看自己应用的复杂程序了,要重新实例zookeeper对象,然后重新操作所有临时数据(包括临时节点和注册Watcher),总之,会话超时在ZK使用过程中是真实存在的。
所以这里也简单总结下,一旦发生会话超时,那么存储在ZK上的所有临时数据与注册的订阅者都会被移除,此时需要重新创建一个ZooKeeper客户端实例,需要自己编码做一些额外的处理。
4、会话时间(Session
Time)
在《ZooKeeper API 使用》一文中已经提到,在实例化一个ZK客户端的时候,需要设置一个会话的超时时间。这里需要注意的一点是,客户端并不是可以随意设置这个会话超时时间,在ZK服务器端对会话超时时间是有限制的,主要是minSessionTimeout和maxSessionTimeout这两个参数设置的。(详细查看这个文章《ZooKeeper管理员指南》)Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。默认的Session超时时间是在2
* tickTime ~ 20 * tickTime。所以,如果应用对于这个会话超时时间有特殊的需求的话,一定要和ZK管理员沟通好,确认好服务端是否设置了对会话时间的限制。
参考从Paxos到Zookeeper分布式一致性原理与实践书籍。
相关文章推荐
- 开园篇----MacDown介绍
- 创建线程对象的三种方式
- 决心书和自我介绍224-陈一鹏 杭州
- 【android】:android工程师的学习实践之路
- mysql索引问题个人总结
- oracle基础
- hdu1215 正整数唯一分解定理应用
- 理解Swift中的Optional
- 用C语言写PHP扩展
- io读文件显示在控制台问题汇总
- getline()函数详解
- 三种查看MySQL数据库版本的方法
- Android 中LocalBroadcastManager的使用方式
- 滚屏无刷新动态加载数据
- 完全数计数
- COUNT()的进一步认识
- 网络基本功(二十八):Wireshark抓包实例分析HTTP问题(下)
- 16.Note the following functionalities of various background processes:
- HTML5面向对象的游戏开发简单实例总结
- SpringMVC中获取request对象的方式