ZooKeeper 客户端和服务端连接会话管理源码分析(服务端会话空闲超时管理分桶策略源码分析)
客户端连接源码分析
ZKClient 客户端,Curator 客户端
先下结论:
Client 要创建一个连接,其首先会在本地创建一个 ZooKeeper 对象,用于表示其所连接上的 Server。
连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
连接关闭后,这个代表 Server 的 zk 对象会被删除。
我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略)
ZKClient源码分析:
下面是ZKClient使用的DEMO:
public class ZKClientTest { // 指定 zk 集群 private static final String CLUSTER = "zkOS:2181"; // 指定节点名称 private static final String PATH = "/mylog"; public static void main(String[] args) { // ---------------- 创建会话 ----------- // 创建 zkClient ZkClient zkClient = new ZkClient(CLUSTER); // 为 zkClient 指定序列化器 zkClient.setZkSerializer(new SerializableSerializer()); // ---------------- 创建节点 ----------- // 指定创建持久节点 CreateMode mode = CreateMode.PERSISTENT; // 指定节点数据内容 String data = "first log"; // 创建节点 String nodeName = zkClient.create(PATH, data, mode); ...
追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:
public class ZkClient implements Watcher { ... public ZkClient(String serverstring) { this(serverstring, Integer.MAX_VALUE); } public ZkClient(String zkServers, int connectionTimeout) { //关键点 看到创建了ZkConnection对象 this(new ZkConnection(zkServers), connectionTimeout); } ... //构造一直走,会走到下面该方法 public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) { if (zkConnection == null) { throw new NullPointerException("Zookeeper connection is null!"); } //将创建的ZkConnection,赋值到_connection 成员变量上 _connection = zkConnection; _zkSerializer = zkSerializer; _operationRetryTimeoutInMillis = operationRetryTimeout; _isZkSaslEnabled = isZkSaslEnabled(); connect(connectionTimeout, this); } public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { boolean started = false; acquireEventLock(); try { setShutdownTrigger(false); _eventThread = new ZkEventThread(_connection.getServers()); _eventThread.start(); //调用ZkConnection.connect进行连接 _connection.connect(watcher); LOG.debug("Awaiting connection to Zookeeper server"); boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS); if (!waitSuccessful) { throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms"); } started = true; } finally { getEventLock().unlock(); // we should close the zookeeper instance, otherwise it would keep // on trying to connect if (!started) { close(); } } } }
通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection
public class ZkConnection implements IZkConnection { ... //关键对象ZooKeeper private ZooKeeper _zk = null; ... public ZkConnection(String zkServers, int sessionTimeOut) { _servers = zkServers; _sessionTimeOut = sessionTimeOut; } @Override public void connect(Watcher watcher) { _zookeeperLock.lock(); try { if (_zk != null) { throw new IllegalStateException("zk client has already been started"); } try { LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + "."); //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象 _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher); } catch (IOException e) { throw new ZkException("Unable to connect to " + _servers, e); } } finally { _zookeeperLock.unlock(); } } }
Curator 源码分析:
下面是Curator使用的DEMO:
public class FluentTest { public static void main(String[] args) throws Exception { // ---------------- 创建会话 ----------- // 创建重试策略对象:重试间隔时间是1秒,最多重试 3 次 ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 创建客户端 CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("zkOS:2181") .sessionTimeoutMs(15000) .connectionTimeoutMs(13000) .retryPolicy(retryPolicy) //namespace:根路径,所有操作都是基于该路径之上 .namespace("logs") .build(); // 开启客户端 client.start(); ...
追踪Curator源码,看下是如何连接的,从client.start()开始:
public class CuratorFrameworkImpl implements CuratorFramework{ ... @Override public void start(){ log.info("Starting"); if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){ throw new IllegalStateException("Cannot be started more than once"); } try{ ... this.getConnectionStateListenable().addListener(listener); client.start(); ... }catch ( Exception e ){ ThreadUtils.checkInterrupted(e); handleBackgroundOperationException(null, e); } } }
关注client.start();这个方法:
public class CuratorZookeeperClient implements Closeable{ ... public void start() throws Exception { log.debug("Starting"); if ( !started.compareAndSet(false, true) ) { throw new IllegalStateException("Already started"); } state.start(); } ... }
继续追踪state.start();
class ConnectionState implements Watcher, Closeable{ ... void start() throws Exception{ log.debug("Starting"); ensembleProvider.start(); reset(); } synchronized void reset() throws Exception{ log.debug("reset"); instanceIndex.incrementAndGet(); isConnected.set(false); connectionStartMs = System.currentTimeMillis(); handleHolder.closeAndReset(); handleHolder.getZooKeeper(); // initiate connection } ``` }
关键点看handleHolder.getZooKeeper()方法:
class HandleHolder{ ... ZooKeeper getZooKeeper() throws Exception{ return (helper != null) ? helper.getZooKeeper() : null; } ... } class Helper{ private final Data data; ... ZooKeeper getZooKeeper() throws Exception{ return data.zooKeeperHandle; } ... }
直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset,看handleHolder.closeAndReset()方法:
class HandleHolder{ ... void closeAndReset() throws Exception{ internalClose(0); Helper.Data data = new Helper.Data(); helper = new Helper(data){ @Override ZooKeeper getZooKeeper() throws Exception{ synchronized(this){ if ( data.zooKeeperHandle == null ){ resetConnectionString(ensembleProvider.getConnectionString()); data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly); } helper = new Helper(data); return super.getZooKeeper(); } } }; } ... }
我们看下data.zooKeeperHandle到底是怎么创建的:
public class NonAdminZookeeperFactory implements ZookeeperFactory{ @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{ return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); } }
可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象
,接下来我们分析ZK源码中的ZooKeeper对象
ZK源码中客户端对象ZooKeeper
我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):
// 跟构造 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { this(connectString, sessionTimeout, watcher, false); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; // 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合 ConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 将缓存集合中的地址打散 HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses()); // 创建一个连接实例 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); // 启动连接 cnxn.start(); }
ConnectStringParser connectStringParser = new ConnectStringParser(connectString)
- 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
public ConnectStringParser(String connectString) { // parse out chroot, if any // 解析chroot,如果有的话(其他的一种写法,不用关注) int off = connectString.indexOf('/'); if (off >= 0) { String chrootPath = connectString.substring(off); // ignore "/" chroot spec, same as null if (chrootPath.length() == 1) { this.chrootPath = null; } else { PathUtils.validatePath(chrootPath); this.chrootPath = chrootPath; } connectString = connectString.substring(0, off); } else { this.chrootPath = null; } //逗号分割 String hostsList[] = connectString.split(","); for (String host : hostsList) { int port = DEFAULT_PORT; int pidx = host.lastIndexOf(':'); if (pidx >= 0) { // otherwise : is at the end of the string, ignore if (pidx < host.length() - 1) { port = Integer.parseInt(host.substring(pidx + 1)); } host = host.substring(0, pidx); } //解析出主机和端口后生成地址并添加到serverAddresses serverAddresses.add(InetSocketAddress.createUnresolved(host, port)); } }
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
- 创建主机提供者,把将缓存集合中的地址打散
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) { // 实例化一个地址解析器 this.resolver = new Resolver() { // 根据主机名获取其对应的所有ip @Override public InetAddress[] getAllByName(String name) throws UnknownHostException { return InetAddress.getAllByName(name); } }; // 将地址打散 init(serverAddresses); } private void init(Collection<InetSocketAddress> serverAddresses) { if (serverAddresses.isEmpty()) { throw new IllegalArgumentException( "A HostProvider may not be empty!"); } // 初始化本地的sererAddresses this.serverAddresses.addAll(serverAddresses); // 将serverAddresses中的元素打散 Collections.shuffle(this.serverAddresses); }
打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个
cnxn = new ClientCnxn(…);
- 创建一个连接实例
cnxn.start();
- 启动连接
public void start() { // 启动连接线程 sendThread.start(); // 启动事件线程 eventThread.start(); }
查看启动连接线程sendThread的run方法
public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; // 若当前连接对象处于激活状态 while (state.isAlive()) { try { // 若当前连接没有连接着server if (!clientCnxnSocket.isConnected()) { // 若不是第一次连接,则随机sleep一会儿,小于1秒 // 休息一会儿减少服务器资源消耗 if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing // 若sleep过后,当前连接被关闭了,或不可用了,则直接结束 if (closing || !state.isAlive()) { break; } // rwServerAddress是一个动态配置的地址,当前Client会先连接该地址 // 若连接失败,则再连接其它地址 if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { // 获取要连接的zkServer的地址 serverAddress = hostProvider.next(1000); } // 开启连接尝试,有可能连接不上,连接不上会循环获取下一个地址继续尝试连接 startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else if (e instanceof SocketException) { LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); } else { LOG.warn("Session 0x{} for server {}, unexpected error{}", Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e); } cleanup(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } cleanup(); clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
-
while (state.isAlive()) {
判断当前连接对象是否处于激活状态// 一共有这么多状态 public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; public boolean isAlive() { return this != CLOSED && this != AUTH_FAILED; } /** * Returns whether we are connected to a server (which * could possibly be read-only, if this client is allowed * to go to read-only mode) * */ public boolean isConnected() { return this == CONNECTED || this == CONNECTEDREADONLY; } }
serverAddress = hostProvider.next(1000);
-
获取要连接的zkServer的地址
// 轮询获取要连接的server的地址 // 如果所有主机都尝试过一次,则等待spinDelay毫秒。 public InetSocketAddress next(long spinDelay) { // 轮询,索引加一后和主机数量取余 currentIndex = ++currentIndex % serverAddresses.size(); // lastIndex用于记录上一次连接上的地址 // 只有当zk是单机模式时,才会出现currentIndex 与 lastIndex 相等的情况(或者已经轮询一轮回来了) // 在zk为单机模式时或者已经轮询一轮回来了,若指定了spinDelay,则sleep spinDelay时间 if (currentIndex == lastIndex && spinDelay > 0) { try { Thread.sleep(spinDelay); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } else if (lastIndex == -1) { // 处理第一次连接的情况 // We don't want to sleep on the first ever connect attempt. lastIndex = 0; } // 从地址列表中获取到“轮询索引”对应的地址 InetSocketAddress curAddr = serverAddresses.get(currentIndex); try { // 根据地址获取该地址对象的主机名 // 如果主机名可用,则计算为主机名,否则返回IP地址的字符串表示。 // 在Java 7中,我们有一个方法getHostString,但是早期版本不支持它。 // 这个方法是为InetSocketAddress.getHostString()提供一个替换。 String curHostString = getHostString(curAddr); // this.resolver.getAllByName(curHostString) 获取主机名对应的所有的ip,其结果是一个数组(即一个主机名对应多个IP地址) // 一个主机多个网卡,就会一个主机名多个IP List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString))); if (resolvedAddresses.isEmpty()) { return curAddr; } // 将ip列表打散 Collections.shuffle(resolvedAddresses); // 从打散的ip列表中取第一个ip作为要连接的地址 return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort()); } catch (UnknownHostException e) { return curAddr; } }
startConnect(serverAddress);
-
开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接)
private void startConnect(InetSocketAddress addr) throws IOException { // initializing it for new connection saslLoginFailed = false; // 修改状态为CONNECTING state = States.CONNECTING; // 设置当前连接的名称 setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); // 若开启了对客户端的SASL验证,则创建一个SASL的客户端 // Simple Authentication and Security Layer,简单认证与安全层 (SASL) // 是一种用来扩充C/S模式验证能力的机制。 if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); // 连接指定的地址 clientCnxnSocket.connect(addr); }
@Override void connect(InetSocketAddress addr) throws IOException { // 获取nio的客户端channel SocketChannel sock = createSock(); try { // 注册并连接 registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; } /** * 可以看到具体连接用的就是jdk的NIO * */ void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { // 将客户端Channel注册到selector sockKey = sock.register(selector, SelectionKey.OP_CONNECT); // channel连接指定Server地址 boolean immediateConnect = sock.connect(addr); if (immediateConnect) { //连接成功会返回true,会进行一些相关的初始化 sendThread.primeConnection(); } }
服务端连接源码分析
ZooKeeper会话理论知识
会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。
ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。
(1) 会话状态
常见的会话状态有三种:
- CONNECTING:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。
- CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
- CLOSED:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。
(2) 会话连接超时管理—客户端维护
我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。
ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。
(3) 会话连接事件
客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:
- CONNECTION_LOSS:连接丢失
- SESSION_MOVED:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。
- SESSION_EXPIRED:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。
(4) 会话空闲超时管理—服务端维护
会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的
服务器为每一个客户端的会话都记录着上一次交互后空闲的时长
,及从上一次交互结束开始会话空闲超时的时间点
。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。
服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略
。
A、分桶策略
分桶策略是指,将空闲超时时间相近
的会话放到同一个桶
中来进行管理,以减少管理的复杂度
。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。
zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。
B、 分桶依据
分桶的计算依据为:
CurrentTime:当前时间(这是时间轴上的时间)
SessionTimeout:会话超时时间(这是一个时间范围)
ExpirationTime:当前会话下一次超时的时间点(这是时间轴上的时间)
ExpirationInterval:桶的大小(这是一个时间范围)
BucketTime:代表的是当前会话下次超时的时间点所在的桶
从以上公式可知,一个桶的大小为 ExpirationInterval 时间。只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。
服务端连接源码分析
找到ZooKeeperServer.startup方法,一但Server启动就会触发该方法
// 启动的会执行该方法 public synchronized void startup() { if (sessionTracker == null) { // 创建一个sessionTracker线程 createSessionTracker(); } // 开启sessionTracker线程 startSessionTracker(); setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll(); }
createSessionTracker();
创建一个sessionTracker(Session跟踪器)线程
protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener()); } //SessionTrackerImpl调用的构造 public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime, long sid, ZooKeeperServerListener listener) { super("SessionTracker", listener); this.expirer = expirer; // 桶的大小,即过期时间间隔 this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; // 计算当前时间所在的会话桶 nextExpirationTime = roundToInterval(Time.currentElapsedTime()); // 初始化下一个会话一但创建所用的sessionId this.nextSessionId = initializeNextSession(sid); //从内存中取出之前保存的会话数据,重新加载到sessionTracker(应该是选举后重启之类的场景) for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { // 会话不存在则创建,并将会话放入相应桶中 addSession(e.getKey(), e.getValue()); } } // 计算指定时间所在的桶 private long roundToInterval(long time) { // We give a one interval grace period // 即计算当前时间已经走过的时间桶数量+1,在乘以时间桶大小,得到我的桶的过期时间点 return (time / expirationInterval + 1) * expirationInterval; } // 创建当前会话,并将会话放入相应桶中 synchronized public void addSession(long id, int sessionTimeout) { sessionsWithTimeout.put(id, sessionTimeout); if (sessionsById.get(id) == null) { // 创建会话实例,刚创建的会话不在任何桶中 SessionImpl s = new SessionImpl(id, sessionTimeout, 0); // 将会话放入到缓存map,注意这个map不是会话桶 sessionsById.put(id, s); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Adding session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } else { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Existing session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } // 将会话放入到桶中 touchSession(id, sessionTimeout); } //先看下创建会话做了什么 public static class SessionImpl implements Session { SessionImpl(long sessionId, int timeout, long expireTime) { this.sessionId = sessionId; this.timeout = timeout; // 注意,这个tickTime用于记录的是当前会话所在的桶 // 这个初值为0,表示当前会话不在任何桶中 this.tickTime = expireTime; isClosing = false; } ... } // 将会话放入到桶中,该方法的返回值表示当前会话是否有效 synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, "SessionTrackerImpl --- Touch session: 0x" + Long.toHexString(sessionId) + " with timeout " + timeout); } // 根据session从缓存map中获取相应Session SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing // 若会话不存在,或会话关闭了,则返回false if (s == null || s.isClosing()) { return false; } // 计算本次会话交互的下次超时时间点所在的会话桶 long expireTime = roundToInterval(Time.currentElapsedTime() + timeout); // s.tickTime用于记录当前会话所在的桶 // 若s.tickTime < expireTime,说明应该换桶了 // 若s.tickTime == expireTime,说明不用换桶 // 注意不可能出现s.tickTime > expireTime的情况 if (s.tickTime >= expireTime) { // Nothing needs to be done // 不需要换桶,不做任何处理 return true; } // 代码走到这里,说明s.tickTime < expireTime成立 // 下面要进行换桶了 // sessionSets是桶集合,key为桶的边界时间点,value为桶实例SessionSet SessionSet set = sessionSets.get(s.tickTime); // 若该桶不空,则从该桶中将会话移除 if (set != null) { set.sessions.remove(s); } // 更新当前会话所在桶的id,即桶的边界桶 s.tickTime = expireTime; // 从桶集合中查找该桶 set = sessionSets.get(s.tickTime); // 若该桶不存在,则创建一个桶,然后将该新建的桶放入桶集合 if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } // 将会话放到新桶 set.sessions.add(s); return true; } //SessionSet就是桶实例,里面放了Session static class SessionSet { HashSet<SessionImpl> sessions = new HashSet<SessionImpl>(); }
上面touchSession方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:
- 会话与当前Server交互时
// 只要会话与当前Server与交互,就会触发该方法的执行 void touch(ServerCnxn cnxn) throws MissingSessionException { if (cnxn == null) { return; } long id = cnxn.getSessionId(); int to = cnxn.getSessionTimeout(); // 上面说过,touchSession方法的返回值表示当前会话是否有效 // 若当前会话是失效的,则抛出异常。否则判断是否换桶,进行换桶 if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException( "No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed"); } }
- 发生会话丢失后,客户端重新发起连接请求时
// 当发生会话丢失后,客户端重新连接上了Server,此时要对session进行验证判断是否失效 protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { // 判断指定会话是否有效 boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); } // 根据rc不同的结果,采用不同的会话处理方式 finishSessionInit(cnxn, rc); } public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // register with JMX try { // 若当前Session仍有效,同意其注册 if (valid) { serverCnxnFactory.registerConnection(cnxn); } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } try { // 创建一个连接响应对象 ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool( this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); cnxn.sendBuffer(bb); // 若连接无效,则向客户端返回超时响应,否则启用该连接 if (!valid) { LOG.info("Invalid session 0x" + Long.toHexString(cnxn.getSessionId()) + " for client " + cnxn.getRemoteSocketAddress() + ", probably expired"); cnxn.sendBuffer(ServerCnxnFactory.closeConn); } else { LOG.info("Established session 0x" + Long.toHexString(cnxn.getSessionId()) + " with negotiated timeout " + cnxn.getSessionTimeout() + " for client " + cnxn.getRemoteSocketAddress()); // 启用接收到的连接 cnxn.enableRecv(); } } catch (Exception e) { LOG.warn("Exception while establishing session, closing", e); cnxn.close(); } }
上面只说了桶的创建和换桶,过期的处理没说,过期处理的操作在startSessionTracker()中
startSessionTracker();
开启sessionTracker线程,我们看下SessionTrackerImpl的run方法:
@Override synchronized public void run() { try { // 只要当前server处理运行态,则该while就不会停止 while (running) { currentTime = Time.currentElapsedTime(); // nextExpirationTime是此时还在使用的那个桶的边界时间 // 刚启动的时候会为nextExpirationTime赋值 // 如果当前时间还没超过桶的边界时间,说明桶还没过期,不需要处理 if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } // 代码走到这里,说明nextExpirationTime <= currentTime成立 // 说明nextExpirationTime桶已经过期,需要将该桶从桶集合中清除, // 需要将该桶中的会话关闭 SessionSet set; // 清除当前桶 set = sessionSets.remove(nextExpirationTime); // 关闭该桶中的所有会话 if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } // 获取下一个桶id nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); }
- zookeeper源码阅读分析笔记--客户端服务端通信机制以及session超时、过期处理
- 【Zookeeper源码二】Zookeeper 客户端创建连接过程分析
- 远程桌面连接会话超时或者被限制改组策略也没用的时候就这么解决
- 基于TCP网络通信的自动升级程序源码分析-客户端连接服务器
- Fresco 源码分析(二) Fresco客户端与服务端交互(2) Fresco.initializeDrawee()分析 续
- Lighttpd1.4.20源码分析之fdevent系统(4) -----连接socket的处理与超时处理
- 客户端与服务端连接分析(一)
- Libevent源码分析-----管理超时event
- Fresco 源码分析(二) Fresco客户端与服务端交互(3) 前后台打通
- nginx事件模块之客户端连接与超时管理
- Socket通信完整实例(心跳包,客户端断线重连,服务端超时断开客户端连接)
- zookeeper源码分析之五服务端(集群leader)处理请求流程
- (转)TCP/UDP简易通信框架源码,支持轻松管理多个TCP服务端(客户端)、UDP客户端
- MINA源码分析1-服务端启动和客户端调用
- Libevent源码分析-----管理超时event
- Netty5源码分析--3.客户端与服务端交互过程详解
- zookeeper源码分析之二客户端启动
- android sensor 框架分析---客户端和服务端的连接
- Fresco 源码分析(二) Fresco客户端与服务端交互(3) 前后台打通
- zookeeper源码分析之四服务端(单机)处理请求流程