您的位置:首页 > 其它

Zookeeper-会话创建流程

2016-07-15 22:40 363 查看


初始化阶段:

初始化Zookeeper对象。调用Zookeeper的构造方法来实例化一个Zookeeper,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager。
设置会话默认Watcher。如果在构造方法中传入一个Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager。
构造Zookeeper服务器地址列表管理器:HostProvider。在构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。
创建并初始化客户端网络连接器:ClientCnxn。Zookeeper客户端首先会创建一个网络连接器ClientCnxn。用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务器端响应的等待队列。
初始化SendThread和EventThread。客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情。

//初始化ZooKeeper

ZooKeeper  zk = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Watcher ());

public ZooKeeper(String connectString, intsessionTimeout, Watcher watcher,

boolean canBeReadOnly)

throws IOException

{

LOG.info("Initiating client connection, connectString=" + connectString

+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

//设置默认的watcher

watchManager.defaultWatcher = watcher;

//分析传入的IP地址,并存放ServerAddress

ConnectStringParser connectStringParser = new ConnectStringParser(

connectString);

//构造Zookeeper服务地址列表管理器、底层是随机分配一个ZK服务地址。基于环形地址列表队列(可以去研究)

HostProvider hostProvider = new StaticHostProvider(

connectStringParser.getServerAddresses());

//创建并初始化客户端网络连接器:ClientCnxn;

cnxn = newClientCnxn(connectStringParser.getChrootPath(),

hostProvider, sessionTimeout, this, watchManager,

getClientCnxnSocket(), canBeReadOnly);

cnxn.start();

}

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,

ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,

long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {

this.zooKeeper = zooKeeper;

this.watcher = watcher;

this.sessionId = sessionId;

this.sessionPasswd = sessionPasswd;

this.sessionTimeout = sessionTimeout;

this.hostProvider = hostProvider;

this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size();

readTimeout = sessionTimeout * 2 / 3;

readOnly = canBeReadOnly;

//初始化SendThread和EventThread

sendThread = new SendThread(clientCnxnSocket);

eventThread = new EventThread();

}

会话创建阶段

启动SendThread和EventThread。SendThread首先会判断当前客户端的状态,进行一系列请理性工作,为客户端发送“会话创建”请求做准备。
获取一个服务器地址。在开始创建TCP之前,SendThread首先需要获取一个Zookeeper服务器的目标地址,
这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与Zookeeper服务器之间的TCP连接。
创建TCP连接。获取一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。

ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:
loop:
- try:
- - !isConnected()
- - - connect()
- - doTransport()
- catch:
- - cleanup()
close()

构造ConnectRequest请求。
在TCP连接创建完毕后,可能有的读者会认为,这样是否就说明已经和Zookeeper服务器完成连接了呢?其实不然,上面的步骤只是纯粹地从网络TCP层完成了客户端与服务端之间的Socket连接,但远未完成Zookeeper客户端的会话创建。
SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试图与服务端创建一个会话。同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue中去。
发送请求。当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。

clientCnxnSocket.doTransport:
@Override

voiddoTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,

ClientCnxn cnxn)

throws IOException, InterruptedException {

selector.select(waitTimeOut);

Set<SelectionKey> selected;

synchronized (this) {

selected = selector.selectedKeys();

}

// Everything below and until we get back to the select is

// non blocking, so time is effectively a constant. That is

// Why we just have to do this once, here

updateNow();

for (SelectionKey k : selected) {

SocketChannel sc = ((SocketChannel) k.channel());

if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {

if (sc.finishConnect()) {

updateLastSendAndHeard();

//构造出一个ConnectRequest请求,该请求代表了客户端试图与服务端创建一个会话。同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue中去。

sendThread.primeConnection();

}

} elseif ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {

doIO(pendingQueue, outgoingQueue, cnxn);   //处理读写操作。

}

}

if (sendThread.getZkState().isConnected()) {

synchronized(outgoingQueue) {

if (findSendablePacket(outgoingQueue,

cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {

enableWrite();

}

}

}

selected.clear();

}


响应处理阶段

接受服务器端响应。ClientCnxnSocket接受到服务端响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。
处理Response。ClientCnxnSocket会对接受到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到Zookeeper服务端分配的会话SessionId。[SessionID怎么分配的了?]
连接成功。连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态,另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。
生成时间:SyncConnected-None。为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。
查询Watcher。EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出存储的默认Watcher,然后将其放到EventThread的watingEvents队列中去。
处理事件。EventThread不断的从watingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。

ClientCnxnSocketNIO:
/**

* @return true if a packet was received

* @throws InterruptedException

* @throws IOException

*/

voiddoIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)

throws InterruptedException, IOException {

SocketChannel sock = (SocketChannel) sockKey.channel();

if (sock == null) {

thrownew IOException("Socket is null!");

}

if (sockKey.isReadable()) {

int rc = sock.read(incomingBuffer);

if (rc < 0) {

thrownew EndOfStreamException(

"Unable to read additional data from server sessionid 0x"

+ Long.toHexString(sessionId)

+ ", likely server has closed socket");

}

if (!incomingBuffer.hasRemaining()) {

incomingBuffer.flip();

if (incomingBuffer == lenBuffer) {

recvCount++;

readLength();

} elseif (!initialized) {

readConnectResult(); //处理Response

enableRead();

if (findSendablePacket(outgoingQueue,

cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {

// Since SASL authentication has completed (if client is configured to do so),

// outgoing packets waiting in the outgoingQueue can now be sent.

enableWrite();

}

lenBuffer.clear();

incomingBuffer = lenBuffer;

updateLastHeard();

initialized = true;

} else {

sendThread.readResponse(incomingBuffer);

lenBuffer.clear();

incomingBuffer = lenBuffer;

updateLastHeard();

}

}

}

if (sockKey.isWritable()) {

synchronized(outgoingQueue) {

Packet p = findSendablePacket(outgoingQueue,

cnxn.sendThread.clientTunneledAuthenticationInProgress());

if (p != null) {

updateLastSend();

// If we already started writing p, p.bb will already exist

if (p.bb == null) {

if ((p.requestHeader != null) &&

(p.requestHeader.getType() != OpCode.ping) &&

(p.requestHeader.getType() != OpCode.auth)) {

p.requestHeader.setXid(cnxn.getXid());

}

p.createBB();

}

sock.write(p.bb);

if (!p.bb.hasRemaining()) {

sentCount++;

outgoingQueue.removeFirstOccurrence(p);

if (p.requestHeader != null

&& p.requestHeader.getType() != OpCode.ping

&& p.requestHeader.getType() != OpCode.auth) {

synchronized (pendingQueue) {

pendingQueue.add(p);

}

}

}

}

if (outgoingQueue.isEmpty()) {

// No more packets to send: turn off write interest flag.

// Will be turned on later by a later call to enableWrite(),

// from within ZooKeeperSaslClient (if client is configured

// to attempt SASL authentication), or in either doIO() or

// in doTransport() if not.

disableWrite();

} elseif (!initialized && p != null && !p.bb.hasRemaining()) {

// On initial connection, write the complete connect request

// packet, but then disable further writes until after

// receiving a successful connection response.  If the

// session is expired, then the server sends the expiration

// response and immediately closes its end of the socket.  If

// the client is simultaneously writing on its end, then the

// TCP stack may choose to abort with RST, in which case the

// client would never receive the session expired event.  See

// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html 
disableWrite();

} else {

// Just in case

enableWrite();

}

}

}

}


doIO:

响应接收:

1、如果检测到当前客户端还尚未初始化,那么说明当前客户端与服务端之间正在进行会话创建,那么直接将接收到的ByteBuffer(imcomingBuffer)序列户成ConnectResponse对象。
2、如果当前客户端已经处于正常的会话周期,并且接收的服务响应的是一个事件,那么Zookeeper客户端会将接收到的ByteBuffer(imcomingBuffer)序列化成WatcherEvent对象,并将该对象放入待处理队列中,。
3.如果是一个常规的请求响应(指定是Create、GetData和Exist等操作请求),那么会从PendingQuene队列中取出一个Packet来进行相应的处理。Zookeeper客户端首先会检查服务端响应中包含的XID值来确保请求处理的顺序性,然后将接收到的ByteBuffer(imcomingBuffer)序列化成相应的Response对象。
最后会在finshPacket方法处理Watcher注册逻辑。

注:outgoingQueue和pendingQuene
ClientCnxn,两个比较核心的队列outgoingQueue和pendingQuene,outgoingQueue队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet对象集合。pendingQuene队列是为了存储那些已经从客户端发送到服务器的,但是需要等待服务器响应的Packet集合。
在正常的情况下(即客户端与服务端之间的TCP链接正常且会话有效的情况下),会从outgoingQueue队列取出Packet一个可发送的对象,同时生成客户端请求序号XID将其设置到Packet请求头中,然后将其序列化后进行发送。“可发送的Packet对象”特指是如果检测到客户端与服务端之间正在处理SASL权限的话,那么那些不含请求头的Packet(会话创建请求)是可以发送的,其余的无法发送。请求完毕后,会立即将该Packet保存到pendingQuene队列中,以便等地啊服务端响应后进行相应的处理。
SendThred:是客户端ClientCnxn内部核心的IO调度线程,用于管理客户端和服务端之间所有网络IO 操作,在Zookeeper客户端的实际运行过程中,一方面,SendThred维护客户端与服务端之间的会话周期,器通过一定的周期频率内向服务器发送一个PING包来实现心跳机制,同时在会话周期内,如果客户端与服务端之间出现TCP连接断开情况,那么会自动且透明化的完成重连操作,,另一方面SendThred管理客户端所有请求的发送和响应接收,其将上层客户端API的操作转换成相应的请求协议发送到服务端,并完成对同步调用的返回和异步调用的回调。同时SendThred还负责将来自服务端的事件传递给EventThred处理.
EventThred:是客户端ClientCnxn另外一个核心线程,负责客户端的事件处理,并触发客户端注册的Watcher监听。EventThred中有一个waitEvents队列,用于临时存放那些需要被触发的Object。包括那么客户端注册的Wacter和异步接口注册的回调器AsyncCallBack,同时EventThred会不断从waitEvents这个队列取出Object,识别出其具体类型,并分别调用process和prodessResult接口方法来事件对事件的触发和回调。
SessionTrackerImpl: SessionTracker是Zookeeper服务端的会话管理器,负责会话创建、管理、清理工作

1、会话概述

在ZooKeeper中,客户端和服务端建立连接后,会话随之建立,生成一个全局唯一的会话ID(Session
ID)。服务器和客户端之间维持的是一个长连接,在SESSION_TIMEOUT时间内,服务器会确定客户端是否正常连接(客户端会定时向服务器发送heart_beat,服务器重置下次SESSION_TIMEOUT时间)。因此,在正常情况下,Session一直有效,并且ZK集群所有机器上都保存这个Session信息。在出现网络或其它问题情况下(例如客户端所连接的那台ZK机器挂了,或是其它原因的网络闪断),客户端与当前连接的那台服务器之间连接断了,这个时候客户端会主动在地址列表(实例化ZK对象的时候传入构造方法的那个参数connectString)中选择新的地址进行连接。

2、连接断开

好了,上面基本就是服务器与客户端之间维持会话的过程了。在这个过程中,用户可能会看到两类异常CONNECTIONLOSS(连接断开)和SESSIONEXPIRED(Session过期)。连接断开(CONNECTIONLOSS)一般发生在网络的闪断或是客户端所连接的服务器挂机的时候,这种情况下,ZooKeeper客户端自己会首先感知到这个异常,具体逻辑是在如下方法中触发的:一种场景是Server服务器挂了,这个时候,ZK客户端首选会捕获异常

所以,现在对于“连接断开”这个过程就一目了然了,核心流程如下: ZK客户端捕获“连接断开”异常 ——> 获取一个新的ZK地址 ——> 尝试连接

在这个流程中,我们可以发现,整个过程不需要开发者额外的程序介入,都是ZK客户端自己会进行的,并且,使用的会话ID都是同一个,所以结论就是:发生CONNECTIONLOSS的情况,应用不需要做什么事情,等待ZK客户端建立新的连接即可。

3、会话超时

SESSIONEXPIRED发生在上面蓝色文字部分,这个通常是ZK客户端与服务器的连接断了,试图连接上新的ZK机器,但是这个过程如果耗时过长,超过了SESSION_TIMEOUT 后还没有成功连接上服务器,那么服务器认为这个Session已经结束了(服务器无法确认是因为其它异常原因还是客户端主动结束会话),由于在ZK中,很多数据和状态都是和会话绑定的,一旦会话失效,那么ZK就开始清除和这个会话有关的信息,包括这个会话创建的临时节点和注册的所有Watcher。在这之后,由于网络恢复后,客户端可能会重新连接上服务器,但是很不幸,服务器会告诉客户端一个异常:SESSIONEXPIRED(会话过期)。此时客户端的状态变成CLOSED状态,应用要做的事情就是的看自己应用的复杂程序了,要重新实例zookeeper对象,然后重新操作所有临时数据(包括临时节点和注册Watcher),总之,会话超时在ZK使用过程中是真实存在的。

所以这里也简单总结下,一旦发生会话超时,那么存储在ZK上的所有临时数据与注册的订阅者都会被移除,此时需要重新创建一个ZooKeeper客户端实例,需要自己编码做一些额外的处理。

4、会话时间(Session
Time)


在《ZooKeeper API 使用》一文中已经提到,在实例化一个ZK客户端的时候,需要设置一个会话的超时时间。这里需要注意的一点是,客户端并不是可以随意设置这个会话超时时间,在ZK服务器端对会话超时时间是有限制的,主要是minSessionTimeout和maxSessionTimeout这两个参数设置的。(详细查看这个文章《ZooKeeper管理员指南》)Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。默认的Session超时时间是在2
* tickTime ~ 20 * tickTime。所以,如果应用对于这个会话超时时间有特殊的需求的话,一定要和ZK管理员沟通好,确认好服务端是否设置了对会话时间的限制。

参考从Paxos到Zookeeper分布式一致性原理与实践书籍。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: