您的位置:首页 > 其它

大众点评Cat源码阅读(七)——客户端选择server的机制

2017-12-21 16:42 435 查看

一、概要思路

客户端跟服务端连接建立,分两步:

初始ChannelMananger的时候 ;

ChannelManager异步线程,每隔10秒做一次检查。

1.1 初始ChannelMananger的时候

实例化ChannelManager的时候,根据配置的第一个server,从远程服务器读取服务器列表,如果能读取到,则顺序建立连接,直到建立成功为止;如果不能 读到,则根据本地配置的列表,逐个建立连接,直到成功为止。

1.2 ChannelManager异步线程,每隔10秒做一次检查

1.2.1 检查server列表是否变更

每间隔10s,检查当前channelFuture是否活跃,活跃,则300s检查一次,不活跃,则执行检查。检查的逻辑是:比较本地server列表跟远程服务提供的列表是否相等,不相等则根据远程服务提供的server列表顺序的重新建立第一个能用的ChannelFuture

1.2.2 查看当前客户端是否有积压,或者channelFuture是否被关闭

如果有积压,或者关闭 掉了,则关闭当前连接,将activeIndex=-1,表示当前连接不可用。

1.2.3 重连默认server

从0到activeIndex中找一个能连接的server,中心建立一个连接。如果activeIndex为-1,则从整个的server列表中顺序的找一个可用的连接建立连接。

二、ChannelManager 实例化,建立netty连接逻辑

客户端实例化DefaultTransportManager对象时,回按照如下流程先实例化m_tcpSocketSender,接着实例化ChannelManager。ChannelManager管理对服务端的netty连接。
实例化流程如下:



ChannelManager通过ChannelHolder把netty的ChannnelFuture封装起来。ChannnelFuture结构如下:

public static class ChannelHolder {
/**
* 当前活跃的channelFuture
*/
private ChannelFuture m_activeFuture;

/**
* 当前server在m_serverAddresses中的第几个
*/
private int m_activeIndex = -1;

/**
* 当前活跃的ChannelFuture对应的配置
*/
private String m_activeServerConfig;

/**
* 从配置文件中读取的服务端列表
*/
private List<InetSocketAddress> m_serverAddresses;

/**
* 当前活跃的ChannelFutre对应的ip
*/
private String m_ip;

/**
* 连接从第一次初始化开始,是否发生过变更
*/
private boolean m_connectChanged;

//省略其它的代码
}

三、ChannelManager内部异步线程,动态切换netty连接逻辑

ChannelManager内部每隔10秒钟,检查netty连接。这部分代码如下:

@Override
public void run() {
while (m_active) {
/*
* make save message id index asyc
* 本地存储index,和 时间戳,防止重启,导致本地的消息id重了
*/
m_idfactory.saveMark();

/**
* 检查本地初始化的服务列表跟远程的服务列表是否有差异,如果有差异,则取远程第一个能建立连接的server,建立一个新的连接,
* 关闭旧的连接
*/
checkServerChanged();

ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

/**
* 检查当前channelFuture是否有消息积压(本地队列长度超过4990),或者 channelFuture不是开的
* @param activeFuture
*/
doubleCheckActiveServer(activeFuture);
/**
* 从serverAddresses列表里面,从新顺序选一个,重新连接
*/
reconnectDefaultServer(activeFuture, serverAddresses);

try {
Thread.sleep(10 * 1000L); // check every 10 seconds
} catch (InterruptedException e) {
// ignore
}
}
}

总结:服务端没有做到负载均衡,连接会慢慢连接到server列表里面第一个可用的server上。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty