您的位置:首页 > 运维架构 > Apache

ZooKeeper 客户端和服务端连接会话管理源码分析(服务端会话空闲超时管理分桶策略源码分析)

2020-07-04 14:19 495 查看

精选30+云产品,助力企业轻松上云!>>>

客户端连接源码分析

ZKClient 客户端,Curator 客户端

先下结论:
Client 要创建一个连接,其首先会在本地创建一个 ZooKeeper 对象,用于表示其所连接上的 Server。
连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
连接关闭后,这个代表 Server 的 zk 对象会被删除。


我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略)

ZKClient源码分析:
下面是ZKClient使用的DEMO:

public class ZKClientTest {
// 指定 zk 集群
private static final String CLUSTER = "zkOS:2181";
// 指定节点名称
private static final String PATH = "/mylog";
public static void main(String[] args) {
// ---------------- 创建会话 -----------
// 创建 zkClient
ZkClient zkClient = new ZkClient(CLUSTER);
// 为 zkClient 指定序列化器
zkClient.setZkSerializer(new SerializableSerializer());
// ---------------- 创建节点 -----------
// 指定创建持久节点
CreateMode mode = CreateMode.PERSISTENT;
// 指定节点数据内容
String data = "first log";
// 创建节点
String nodeName = zkClient.create(PATH, data, mode);
...

追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:

public class ZkClient implements Watcher {
...
public ZkClient(String serverstring) {
this(serverstring, Integer.MAX_VALUE);
}
public ZkClient(String zkServers, int connectionTimeout) {
//关键点 看到创建了ZkConnection对象
this(new ZkConnection(zkServers), connectionTimeout);
}
...
//构造一直走,会走到下面该方法
public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
//将创建的ZkConnection,赋值到_connection 成员变量上
_connection = zkConnection;
_zkSerializer = zkSerializer;
_operationRetryTimeoutInMillis = operationRetryTimeout;
_isZkSaslEnabled = isZkSaslEnabled();
connect(connectionTimeout, this);
}

public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
boolean started = false;
acquireEventLock();
try {
setShutdownTrigger(false);
_eventThread = new ZkEventThread(_connection.getServers());
_eventThread.start();
//调用ZkConnection.connect进行连接
_connection.connect(watcher);

LOG.debug("Awaiting connection to Zookeeper server");
boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);
if (!waitSuccessful) {
throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");
}
started = true;
} finally {
getEventLock().unlock();

// we should close the zookeeper instance, otherwise it would keep
// on trying to connect
if (!started) {
close();
}
}
}
}

通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection

public class ZkConnection implements IZkConnection {
...
//关键对象ZooKeeper
private ZooKeeper _zk = null;
...
public ZkConnection(String zkServers, int sessionTimeOut) {
_servers = zkServers;
_sessionTimeOut = sessionTimeOut;
}

@Override
public void connect(Watcher watcher) {
_zookeeperLock.lock();
try {
if (_zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
//!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象
_zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
} catch (IOException e) {
throw new ZkException("Unable to connect to " + _servers, e);
}
} finally {
_zookeeperLock.unlock();
}
}
}

Curator 源码分析:
下面是Curator使用的DEMO:

public class FluentTest {
public static void main(String[] args) throws Exception {
// ---------------- 创建会话 -----------
// 创建重试策略对象:重试间隔时间是1秒,最多重试 3 次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("zkOS:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(13000)
.retryPolicy(retryPolicy)
//namespace:根路径,所有操作都是基于该路径之上
.namespace("logs")
.build();
// 开启客户端
client.start();
...

追踪Curator源码,看下是如何连接的,从client.start()开始:

public class CuratorFrameworkImpl implements CuratorFramework{
...
@Override
public void start(){
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){
throw new IllegalStateException("Cannot be started more than once");
}

try{
...
this.getConnectionStateListenable().addListener(listener);

client.start();
...
}catch ( Exception e ){
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}

}

关注client.start();这个方法:

public class CuratorZookeeperClient implements Closeable{
...
public void start() throws Exception
{
log.debug("Starting");

if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}

state.start();
}
...
}

继续追踪state.start();

class ConnectionState implements Watcher, Closeable{
...
void start() throws Exception{
log.debug("Starting");
ensembleProvider.start();
reset();
}

synchronized void reset() throws Exception{
log.debug("reset");

instanceIndex.incrementAndGet();

isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
handleHolder.closeAndReset();
handleHolder.getZooKeeper();   // initiate connection
}
```
}

关键点看handleHolder.getZooKeeper()方法:

class HandleHolder{
...
ZooKeeper getZooKeeper() throws Exception{
return (helper != null) ? helper.getZooKeeper() : null;
}
...
}

class Helper{
private final Data data;
...
ZooKeeper getZooKeeper() throws Exception{
return data.zooKeeperHandle;
}
...
}

直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset,看handleHolder.closeAndReset()方法:

class HandleHolder{
...

void closeAndReset() throws Exception{
internalClose(0);

Helper.Data data = new Helper.Data();
helper = new Helper(data){
@Override
ZooKeeper getZooKeeper() throws Exception{
synchronized(this){
if ( data.zooKeeperHandle == null ){
resetConnectionString(ensembleProvider.getConnectionString());
data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);
}

helper = new Helper(data);

return super.getZooKeeper();
}
}
};
}
...
}

我们看下data.zooKeeperHandle到底是怎么创建的:

public class NonAdminZookeeperFactory implements ZookeeperFactory{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}

可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象,接下来我们分析ZK源码中的ZooKeeper对象

ZK源码中客户端对象ZooKeeper

我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):

// 跟构造
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

watchManager.defaultWatcher = watcher;

// 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
// 将缓存集合中的地址打散
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
// 创建一个连接实例
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// 启动连接
cnxn.start();
}

ConnectStringParser connectStringParser = new ConnectStringParser(connectString)

  • 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
    public ConnectStringParser(String connectString) {
    // parse out chroot, if any
    // 解析chroot,如果有的话(其他的一种写法,不用关注)
    int off = connectString.indexOf('/');
    if (off >= 0) {
    String chrootPath = connectString.substring(off);
    // ignore "/" chroot spec, same as null
    if (chrootPath.length() == 1) {
    this.chrootPath = null;
    } else {
    PathUtils.validatePath(chrootPath);
    this.chrootPath = chrootPath;
    }
    connectString = connectString.substring(0, off);
    } else {
    this.chrootPath = null;
    }
    //逗号分割
    String hostsList[] = connectString.split(",");
    for (String host : hostsList) {
    int port = DEFAULT_PORT;
    int pidx = host.lastIndexOf(':');
    if (pidx >= 0) {
    // otherwise : is at the end of the string, ignore
    if (pidx < host.length() - 1) {
    port = Integer.parseInt(host.substring(pidx + 1));
    }
    host = host.substring(0, pidx);
    }
    //解析出主机和端口后生成地址并添加到serverAddresses
    serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    }
    }

HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());

  • 创建主机提供者,把将缓存集合中的地址打散
    public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
    // 实例化一个地址解析器
    this.resolver = new Resolver() {
    // 根据主机名获取其对应的所有ip
    @Override
    public InetAddress[] getAllByName(String name) throws UnknownHostException {
    return InetAddress.getAllByName(name);
    }
    };
    // 将地址打散
    init(serverAddresses);
    }
    
    private void init(Collection<InetSocketAddress> serverAddresses) {
    if (serverAddresses.isEmpty()) {
    throw new IllegalArgumentException(
    "A HostProvider may not be empty!");
    }
    // 初始化本地的sererAddresses
    this.serverAddresses.addAll(serverAddresses);
    // 将serverAddresses中的元素打散
    Collections.shuffle(this.serverAddresses);
    }

    打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个

cnxn = new ClientCnxn(…);

  • 创建一个连接实例

cnxn.start();

  • 启动连接
    public void start() {
    // 启动连接线程
    sendThread.start();
    // 启动事件线程
    eventThread.start();
    }

查看启动连接线程sendThread的run方法

public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// 若当前连接对象处于激活状态
while (state.isAlive()) {
try {
// 若当前连接没有连接着server
if (!clientCnxnSocket.isConnected()) {
// 若不是第一次连接,则随机sleep一会儿,小于1秒
// 休息一会儿减少服务器资源消耗
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
// 若sleep过后,当前连接被关闭了,或不可用了,则直接结束
if (closing || !state.isAlive()) {
break;
}
// rwServerAddress是一个动态配置的地址,当前Client会先连接该地址
// 若连接失败,则再连接其它地址
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// 获取要连接的zkServer的地址
serverAddress = hostProvider.next(1000);
}
// 开启连接尝试,有可能连接不上,连接不上会循环获取下一个地址继续尝试连接
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}

if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}

if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}

// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}

clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
  • while (state.isAlive()) {

    判断当前连接对象是否处于激活状态
    // 一共有这么多状态
    public enum States {
    CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
    CLOSED, AUTH_FAILED, NOT_CONNECTED;
    
    public boolean isAlive() {
    return this != CLOSED && this != AUTH_FAILED;
    }
    
    /**
    * Returns whether we are connected to a server (which
    * could possibly be read-only, if this client is allowed
    * to go to read-only mode)
    * */
    public boolean isConnected() {
    return this == CONNECTED || this == CONNECTEDREADONLY;
    }
    }
  • serverAddress = hostProvider.next(1000);

      获取要连接的zkServer的地址
      // 轮询获取要连接的server的地址
      // 如果所有主机都尝试过一次,则等待spinDelay毫秒。
      public InetSocketAddress next(long spinDelay) {
      // 轮询,索引加一后和主机数量取余
      currentIndex = ++currentIndex % serverAddresses.size();
      // lastIndex用于记录上一次连接上的地址
      // 只有当zk是单机模式时,才会出现currentIndex 与 lastIndex 相等的情况(或者已经轮询一轮回来了)
      // 在zk为单机模式时或者已经轮询一轮回来了,若指定了spinDelay,则sleep spinDelay时间
      if (currentIndex == lastIndex && spinDelay > 0) {
      try {
      Thread.sleep(spinDelay);
      } catch (InterruptedException e) {
      LOG.warn("Unexpected exception", e);
      }
      } else if (lastIndex == -1) {   // 处理第一次连接的情况
      // We don't want to sleep on the first ever connect attempt.
      lastIndex = 0;
      }
      
      // 从地址列表中获取到“轮询索引”对应的地址
      InetSocketAddress curAddr = serverAddresses.get(currentIndex);
      try {
      // 根据地址获取该地址对象的主机名
      // 如果主机名可用,则计算为主机名,否则返回IP地址的字符串表示。
      // 在Java 7中,我们有一个方法getHostString,但是早期版本不支持它。
      // 这个方法是为InetSocketAddress.getHostString()提供一个替换。
      String curHostString = getHostString(curAddr);
      // this.resolver.getAllByName(curHostString)  获取主机名对应的所有的ip,其结果是一个数组(即一个主机名对应多个IP地址)
      // 一个主机多个网卡,就会一个主机名多个IP
      List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
      if (resolvedAddresses.isEmpty()) {
      return curAddr;
      }
      // 将ip列表打散
      Collections.shuffle(resolvedAddresses);
      // 从打散的ip列表中取第一个ip作为要连接的地址
      return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
      } catch (UnknownHostException e) {
      return curAddr;
      }
      }
  • startConnect(serverAddress);

      开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接)
      private void startConnect(InetSocketAddress addr) throws IOException {
      // initializing it for new connection
      saslLoginFailed = false;
      // 修改状态为CONNECTING
      state = States.CONNECTING;
      // 设置当前连接的名称
      setName(getName().replaceAll("\\(.*\\)",
      "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
      // 若开启了对客户端的SASL验证,则创建一个SASL的客户端
      // Simple Authentication and Security Layer,简单认证与安全层 (SASL)
      // 是一种用来扩充C/S模式验证能力的机制。
      if (ZooKeeperSaslClient.isEnabled()) {
      try {
      String principalUserName = System.getProperty(
      ZK_SASL_CLIENT_USERNAME, "zookeeper");
      zooKeeperSaslClient =
      new ZooKeeperSaslClient(
      principalUserName+"/"+addr.getHostName());
      } catch (LoginException e) {
      // An authentication error occurred when the SASL client tried to initialize:
      // for Kerberos this means that the client failed to authenticate with the KDC.
      // This is different from an authentication error that occurs during communication
      // with the Zookeeper server, which is handled below.
      LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
      + "SASL authentication, if Zookeeper server allows it.");
      eventThread.queueEvent(new WatchedEvent(
      Watcher.Event.EventType.None,
      Watcher.Event.KeeperState.AuthFailed, null));
      saslLoginFailed = true;
      }
      }
      logStartConnect(addr);
      // 连接指定的地址
      clientCnxnSocket.connect(addr);
      }
    • clientCnxnSocket.connect(addr); 连接指定的地址
      @Override
      void connect(InetSocketAddress addr) throws IOException {
      // 获取nio的客户端channel
      SocketChannel sock = createSock();
      try {
      // 注册并连接
      registerAndConnect(sock, addr);
      } catch (IOException e) {
      LOG.error("Unable to open socket to " + addr);
      sock.close();
      throw e;
      }
      initialized = false;
      
      /*
      * Reset incomingBuffer
      */
      lenBuffer.clear();
      incomingBuffer = lenBuffer;
      }
      
      /**
      * 可以看到具体连接用的就是jdk的NIO
      *
      */
      void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
      // 将客户端Channel注册到selector
      sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
      // channel连接指定Server地址
      boolean immediateConnect = sock.connect(addr);
      if (immediateConnect) {
      //连接成功会返回true,会进行一些相关的初始化
      sendThread.primeConnection();
      }
      }

    服务端连接源码分析

    ZooKeeper会话理论知识

    会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。

    ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。

    (1) 会话状态

    常见的会话状态有三种:

    • CONNECTING:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。
    • CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
    • CLOSED:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。

    (2) 会话连接超时管理—客户端维护

    我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。

    ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。

    (3) 会话连接事件

    客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:

    • CONNECTION_LOSS:连接丢失
    • SESSION_MOVED:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。
    • SESSION_EXPIRED:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。

    (4) 会话空闲超时管理—服务端维护

    会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的

    服务器为每一个客户端的会话都记录着上一次交互后空闲的时长,及从上一次交互结束开始会话空闲超时的时间点。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。

    服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略

    A、分桶策略

    分桶策略是指,将空闲超时时间相近的会话放到同一个桶中来进行管理,以减少管理的复杂度。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。

    zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。

    B、 分桶依据

    分桶的计算依据为:

    CurrentTime:当前时间(这是时间轴上的时间)
    SessionTimeout:会话超时时间(这是一个时间范围)
    ExpirationTime:当前会话下一次超时的时间点(这是时间轴上的时间)
    ExpirationInterval:桶的大小(这是一个时间范围)
    BucketTime:代表的是当前会话下次超时的时间点所在的桶

    从以上公式可知,一个桶的大小为 ExpirationInterval 时间。只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。







    服务端连接源码分析

    找到ZooKeeperServer.startup方法,一但Server启动就会触发该方法

    // 启动的会执行该方法
    public synchronized void startup() {
    if (sessionTracker == null) {
    // 创建一个sessionTracker线程
    createSessionTracker();
    }
    // 开启sessionTracker线程
    startSessionTracker();
    setupRequestProcessors();
    
    registerJMX();
    
    setState(State.RUNNING);
    notifyAll();
    }

    createSessionTracker();

    创建一个sessionTracker(Session跟踪器)线程

    protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
    tickTime, 1, getZooKeeperServerListener());
    }
    
    //SessionTrackerImpl调用的构造
    public SessionTrackerImpl(SessionExpirer expirer,
    ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
    long sid, ZooKeeperServerListener listener)
    {
    super("SessionTracker", listener);
    this.expirer = expirer;
    // 桶的大小,即过期时间间隔
    this.expirationInterval = tickTime;
    this.sessionsWithTimeout = sessionsWithTimeout;
    // 计算当前时间所在的会话桶
    nextExpirationTime = roundToInterval(Time.currentElapsedTime());
    // 初始化下一个会话一但创建所用的sessionId
    this.nextSessionId = initializeNextSession(sid);
    //从内存中取出之前保存的会话数据,重新加载到sessionTracker(应该是选举后重启之类的场景)
    for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
    // 会话不存在则创建,并将会话放入相应桶中
    addSession(e.getKey(), e.getValue());
    }
    }
    
    // 计算指定时间所在的桶
    private long roundToInterval(long time) {
    // We give a one interval grace period
    // 即计算当前时间已经走过的时间桶数量+1,在乘以时间桶大小,得到我的桶的过期时间点
    return (time / expirationInterval + 1) * expirationInterval;
    }
    
    // 创建当前会话,并将会话放入相应桶中
    synchronized public void addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);
    if (sessionsById.get(id) == null) {
    // 创建会话实例,刚创建的会话不在任何桶中
    SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
    // 将会话放入到缓存map,注意这个map不是会话桶
    sessionsById.put(id, s);
    if (LOG.isTraceEnabled()) {
    ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
    "SessionTrackerImpl --- Adding session 0x"
    + Long.toHexString(id) + " " + sessionTimeout);
    }
    } else {
    if (LOG.isTraceEnabled()) {
    ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
    "SessionTrackerImpl --- Existing session 0x"
    + Long.toHexString(id) + " " + sessionTimeout);
    }
    }
    // 将会话放入到桶中
    touchSession(id, sessionTimeout);
    }
    
    //先看下创建会话做了什么
    public static class SessionImpl implements Session {
    SessionImpl(long sessionId, int timeout, long expireTime) {
    this.sessionId = sessionId;
    this.timeout = timeout;
    // 注意,这个tickTime用于记录的是当前会话所在的桶
    // 这个初值为0,表示当前会话不在任何桶中
    this.tickTime = expireTime;
    isClosing = false;
    }
    ...
    }
    
    // 将会话放入到桶中,该方法的返回值表示当前会话是否有效
    synchronized public boolean touchSession(long sessionId, int timeout) {
    if (LOG.isTraceEnabled()) {
    ZooTrace.logTraceMessage(LOG,
    ZooTrace.CLIENT_PING_TRACE_MASK,
    "SessionTrackerImpl --- Touch session: 0x"
    + Long.toHexString(sessionId) + " with timeout " + timeout);
    }
    // 根据session从缓存map中获取相应Session
    SessionImpl s = sessionsById.get(sessionId);
    
    // Return false, if the session doesn't exists or marked as closing
    // 若会话不存在,或会话关闭了,则返回false
    if (s == null || s.isClosing()) {
    return false;
    }
    // 计算本次会话交互的下次超时时间点所在的会话桶
    long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
    // s.tickTime用于记录当前会话所在的桶
    // 若s.tickTime < expireTime,说明应该换桶了
    // 若s.tickTime == expireTime,说明不用换桶
    // 注意不可能出现s.tickTime > expireTime的情况
    if (s.tickTime >= expireTime) {
    // Nothing needs to be done
    // 不需要换桶,不做任何处理
    return true;
    }
    
    // 代码走到这里,说明s.tickTime < expireTime成立
    // 下面要进行换桶了
    // sessionSets是桶集合,key为桶的边界时间点,value为桶实例SessionSet
    SessionSet set = sessionSets.get(s.tickTime);
    // 若该桶不空,则从该桶中将会话移除
    if (set != null) {
    set.sessions.remove(s);
    }
    // 更新当前会话所在桶的id,即桶的边界桶
    s.tickTime = expireTime;
    // 从桶集合中查找该桶
    set = sessionSets.get(s.tickTime);
    // 若该桶不存在,则创建一个桶,然后将该新建的桶放入桶集合
    if (set == null) {
    set = new SessionSet();
    sessionSets.put(expireTime, set);
    }
    // 将会话放到新桶
    set.sessions.add(s);
    return true;
    }
    
    //SessionSet就是桶实例,里面放了Session
    static class SessionSet {
    HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
    }

    上面touchSession方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:

    • 会话与当前Server交互时
      // 只要会话与当前Server与交互,就会触发该方法的执行
      void touch(ServerCnxn cnxn) throws MissingSessionException {
      if (cnxn == null) {
      return;
      }
      long id = cnxn.getSessionId();
      int to = cnxn.getSessionTimeout();
      // 上面说过,touchSession方法的返回值表示当前会话是否有效
      // 若当前会话是失效的,则抛出异常。否则判断是否换桶,进行换桶
      if (!sessionTracker.touchSession(id, to)) {
      throw new MissingSessionException(
      "No session with sessionid 0x" + Long.toHexString(id)
      + " exists, probably expired and removed");
      }
      }
    • 发生会话丢失后,客户端重新发起连接请求时
      // 当发生会话丢失后,客户端重新连接上了Server,此时要对session进行验证判断是否失效
      protected void revalidateSession(ServerCnxn cnxn, long sessionId,
      int sessionTimeout) throws IOException {
      // 判断指定会话是否有效
      boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
      if (LOG.isTraceEnabled()) {
      ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
      "Session 0x" + Long.toHexString(sessionId) +
      " is valid: " + rc);
      }
      // 根据rc不同的结果,采用不同的会话处理方式
      finishSessionInit(cnxn, rc);
      }
      
      public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
      // register with JMX
      try {
      // 若当前Session仍有效,同意其注册
      if (valid) {
      serverCnxnFactory.registerConnection(cnxn);
      }
      } catch (Exception e) {
      LOG.warn("Failed to register with JMX", e);
      }
      
      try {
      // 创建一个连接响应对象
      ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
      : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
      // longer valid
      valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
      bos.writeInt(-1, "len");
      rsp.serialize(bos, "connect");
      if (!cnxn.isOldClient) {
      bos.writeBool(
      this instanceof ReadOnlyZooKeeperServer, "readOnly");
      }
      baos.close();
      ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
      bb.putInt(bb.remaining() - 4).rewind();
      cnxn.sendBuffer(bb);
      
      // 若连接无效,则向客户端返回超时响应,否则启用该连接
      if (!valid) {
      LOG.info("Invalid session 0x"
      + Long.toHexString(cnxn.getSessionId())
      + " for client "
      + cnxn.getRemoteSocketAddress()
      + ", probably expired");
      cnxn.sendBuffer(ServerCnxnFactory.closeConn);
      } else {
      LOG.info("Established session 0x"
      + Long.toHexString(cnxn.getSessionId())
      + " with negotiated timeout " + cnxn.getSessionTimeout()
      + " for client "
      + cnxn.getRemoteSocketAddress());
      // 启用接收到的连接
      cnxn.enableRecv();
      }
      
      } catch (Exception e) {
      LOG.warn("Exception while establishing session, closing", e);
      cnxn.close();
      }
      }

    上面只说了桶的创建和换桶,过期的处理没说,过期处理的操作在startSessionTracker()中

    startSessionTracker();

    开启sessionTracker线程,我们看下SessionTrackerImpl的run方法:

    @Override
    synchronized public void run() {
    try {
    // 只要当前server处理运行态,则该while就不会停止
    while (running) {
    currentTime = Time.currentElapsedTime();
    // nextExpirationTime是此时还在使用的那个桶的边界时间
    // 刚启动的时候会为nextExpirationTime赋值
    // 如果当前时间还没超过桶的边界时间,说明桶还没过期,不需要处理
    if (nextExpirationTime > currentTime) {
    this.wait(nextExpirationTime - currentTime);
    continue;
    }
    // 代码走到这里,说明nextExpirationTime <= currentTime成立
    // 说明nextExpirationTime桶已经过期,需要将该桶从桶集合中清除,
    // 需要将该桶中的会话关闭
    SessionSet set;
    // 清除当前桶
    set = sessionSets.remove(nextExpirationTime);
    // 关闭该桶中的所有会话
    if (set != null) {
    for (SessionImpl s : set.sessions) {
    setSessionClosing(s.sessionId);
    expirer.expire(s);
    }
    }
    // 获取下一个桶id
    nextExpirationTime += expirationInterval;
    }
    } catch (InterruptedException e) {
    handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
    }
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签:  zkclient apache curator