大众点评Cat源码阅读(七)——客户端选择server的机制
2017-12-21 16:42
435 查看
一、概要思路
客户端跟服务端连接建立,分两步:初始ChannelMananger的时候 ;
ChannelManager异步线程,每隔10秒做一次检查。
1.1 初始ChannelMananger的时候
实例化ChannelManager的时候,根据配置的第一个server,从远程服务器读取服务器列表,如果能读取到,则顺序建立连接,直到建立成功为止;如果不能 读到,则根据本地配置的列表,逐个建立连接,直到成功为止。1.2 ChannelManager异步线程,每隔10秒做一次检查
1.2.1 检查server列表是否变更
每间隔10s,检查当前channelFuture是否活跃,活跃,则300s检查一次,不活跃,则执行检查。检查的逻辑是:比较本地server列表跟远程服务提供的列表是否相等,不相等则根据远程服务提供的server列表顺序的重新建立第一个能用的ChannelFuture1.2.2 查看当前客户端是否有积压,或者channelFuture是否被关闭
如果有积压,或者关闭 掉了,则关闭当前连接,将activeIndex=-1,表示当前连接不可用。1.2.3 重连默认server
从0到activeIndex中找一个能连接的server,中心建立一个连接。如果activeIndex为-1,则从整个的server列表中顺序的找一个可用的连接建立连接。二、ChannelManager 实例化,建立netty连接逻辑
客户端实例化DefaultTransportManager对象时,回按照如下流程先实例化m_tcpSocketSender,接着实例化ChannelManager。ChannelManager管理对服务端的netty连接。实例化流程如下:
![](https://static.oschina.net/uploads/img/201712/21171425_6VKO.png)
ChannelManager通过ChannelHolder把netty的ChannnelFuture封装起来。ChannnelFuture结构如下:
public static class ChannelHolder { /** * 当前活跃的channelFuture */ private ChannelFuture m_activeFuture; /** * 当前server在m_serverAddresses中的第几个 */ private int m_activeIndex = -1; /** * 当前活跃的ChannelFuture对应的配置 */ private String m_activeServerConfig; /** * 从配置文件中读取的服务端列表 */ private List<InetSocketAddress> m_serverAddresses; /** * 当前活跃的ChannelFutre对应的ip */ private String m_ip; /** * 连接从第一次初始化开始,是否发生过变更 */ private boolean m_connectChanged; //省略其它的代码 }
三、ChannelManager内部异步线程,动态切换netty连接逻辑
ChannelManager内部每隔10秒钟,检查netty连接。这部分代码如下:@Override public void run() { while (m_active) { /* * make save message id index asyc * 本地存储index,和 时间戳,防止重启,导致本地的消息id重了 */ m_idfactory.saveMark(); /** * 检查本地初始化的服务列表跟远程的服务列表是否有差异,如果有差异,则取远程第一个能建立连接的server,建立一个新的连接, * 关闭旧的连接 */ checkServerChanged(); ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture(); List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); /** * 检查当前channelFuture是否有消息积压(本地队列长度超过4990),或者 channelFuture不是开的 * @param activeFuture */ doubleCheckActiveServer(activeFuture); /** * 从serverAddresses列表里面,从新顺序选一个,重新连接 */ reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore } } }
总结:服务端没有做到负载均衡,连接会慢慢连接到server列表里面第一个可用的server上。
相关文章推荐
- 分布式文件系统KFS源码阅读与分析(三):RPC实现机制(MetaServer端)
- 分布式文件系统KFS源码阅读与分析(三):RPC实现机制(MetaServer端)
- 大众点评Cat源码阅读(九)——报表部分内存数据结构及数值计算公式
- zookeeper源码阅读分析笔记--客户端服务端通信机制以及session超时、过期处理
- daily news新闻阅读客户端应用源码(兼容iPhone和iPad)
- [分布式监控CAT] Server端源码解析——消息消费\报表处理\展示
- daily news新闻阅读客户端应用源码(兼容iPhone和iPad)
- Horizon 源码阅读(二)—— Horizon 模块注册机制
- daily news新闻阅读客户端应用源码(兼容iPhone和iPad)
- Zookeeper教程(三):ZooKeeper源码阅读之Worker机制及集群状态监控
- Hadoop RPC源码阅读-服务端Server
- [置顶] Redis客户端Jedis源码阅读及连接池分析
- [分布式监控CAT] Server端源码解析——初始化
- [ZeroMQ] libzmq 源码阅读 之 Reactor机制(mailbox, event)
- Hbase源码阅读(二) 客户端定位Region流程
- 开源项目源码阅读版本选择
- ASP.NET2.0实现无刷新客户端回调的Callback机制(示例源码)
- ZooKeeper源码阅读(二):客户端
- Gaea源码阅读(二):客户端流程
- Storm源码阅读(二):客户端