rockeMq-nameSrv原理源码分析
2018-05-06 13:27
113 查看
namesrv源码结构如下图
大家都知道rocketMq的nameSrv是做路由服务的,那他到底会维护那些路由信息呢?我们把这个问题搞清楚了,可能就能在整体上更好的把握他。路由信息都是保存在RouteInfoManager中的,如下
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;我们一个一个看,这些都是用来存储什么信息的:1、topicQueueTable,其实根据官方原代码的注释,很多东西我们已经能猜出来了,不过验证一下肯定更准确了,看下QueueData的定义
private String brokerName; // broker的名称 private int readQueueNums; // 读队列数量 private int writeQueueNums; // 写队列数量 private int perm; // 读写权限 private int topicSynFlag; // 同步复制还是异步复制标记所以,现在我们可以知道topicQueueTable维护的就是形如摸个topic在哪些broker上,以及他们的读队列数量、写队列数量、读写权限、同步复制还是异步复制标记。2、brokerAddrTable,此hashMap的key存放的是brokerName,主要看下BrokerData
private String cluster; //集群名称 private String brokerName; //broker名 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; //brokerId对应的地址所以,我们现在可以知道brokerAddrTable维护的就是形如某个brokerName所属的集群名称,以及他的主从broker的地址。3、clusterAddrTable,此hashMap的key是保存的是集群名称,value存的是他所包含的broker名的set集合4、brokerLiveTable,此hashMap的key存的是broker的地址,主要看下BrokerLiveInfo
private long lastUpdateTimestamp; //最后一次心跳或更新的时间戳 private DataVersion dataVersion; //() private Channel channel; //通信通道 private String haServerAddr; //高可用地址,主从复制的端口所以,我们可以知道brokerLiveTable是维护每个存活的broker的以上存活信息的5、filterServerTable维护的就是每个broker的文件过滤集合。
下面看看源码结构上各个部分是干嘛的:
KVConfigManager:
作用即加载namesrvController指定的kvConfig配置文件(常为xxx/kvConfig.json)到内存读取或增加,删除kvConfig记录将内存记录的配置,持久化到文件打印所有kvConfig配置。
DefaultRequestProcessor: DefaultRequestProcessor是NameServer的默认请求处理器,其中最重要得方法就是public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request),他处理了定义在rocketmq-common模块中RequestCode定义的部分请求,比如注册broker、注销broker、获取topic路由、删除topic、获取broker的topic权限、获取NameServer的所有topic等。
例如接受到注册broker请求时的处理流程如下:调用该方法传入的request.getCode为RequestCode.REGISTER_BROKER,即代表该请求为注册broker。首先会获取发出请求的MQ的版本,版本号 >= V3_0_11与 < V3_0_11会调用不同的注册方法,例如我们正在使用的V3_5_8注册broker则调用registerBrokerWithFilterServer方法进行注册。大致流程是从requestHeader获取mq的ClusterName、brokerAddr、brokeName、brokerId,haServerAddr(master用来监听slave复制数据的端口)。从requestBody里获topictConfig、filterServerList(消息过滤服务列表)(待确认)。然后调用namesrvController.getRouteInfoManager().registerBroker方法。即向HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable里put一个broker信息。如果消息过滤服务列表不为空,则向HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable里put一条记录,filterServerTable.put(brokerAddr, filterServerList)。关于filter会在filtersrv里详细描述。注册完成后返回给Broker端主Broker的地址,以及该用Broker的haServerAddr地址。
ClusterTestRequestProcessor: ClusterTestRequestProcessor继承DefaultRequestProcessor,仅重写了 getRouteInfoByTopic()方法。 在重写的方法中仅加入了一行代码: topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic()); 就是在本机上获取不到topic的信息会尝试去其他的namesvr上获取,这正是namesrv分布式实现的一个体现。 跟入代码会发现最终是调用了NettyRemotingClient.invokeSync() 进而调用NettyRemotingAbstract.invokeSyncImpl(),使用netty通信,获取其他namesrv上的该topic信息。 NettyRemotingClient中的namesrvAddrList 存储着所有的namesrv的地址。
BrokerHousekeepingService: 这个接口会监听网络层的请求,如果有关闭连接,异常,空闲等请求,直接从brokerLiveTable中移除该数据;如果是正常保持连接的状态的请求,则什么也不做。从调用链来看,主要心跳检测时会发出这些请求。BrokerLiveInfo broker存活状态,主要包括broker的版本号,channel,和最近心跳时间和haServerAddr。 RouteInfoManager:主要维护以下路由信息
HashMap<String/* topic */, List<QueueData>> topicQueueTable; HashMap<String/* brokerName */, BrokerData> brokerAddrTable; HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;以及一些我们常用的mq命令对应的方法: 例如执行命令 mqadmin clusterList -n 10.3.254.52:9876 就是查看集群信息,对应RouteInfoManager的
public byte[] getAllClusterInfo() { ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo(); clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable); clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable); return clusterInfoSerializeWrapper.encode(); }例如执行命令mqadmin deleteTopic -n 10.3.254.52:9876 -c StockMqCluster -t MyTopic 就是删除指定topic信息。对应的RouteInfoManager的
public void deleteTopic(final String topic) { try { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.remove(topic); } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("deleteTopic Exception", e); } }分析到这里,namesrv的大致结构已经很清晰了,现在通过namesrvStartup和namesrvController将这个几个部分串联起来。 当我们启动rocketMq的nameserver时,实际调用了namesrvStartup.main(),进而调用了namesrvStartup.main0(),该方法会实例化一个NamesrvConfig并设置监听端口为9876和一个NettyServerConfig。 然后将namesrvConfig和nettyServerConfig 作为参数实例化一个NamesrvController 。 下一步通过调用controller.initialize()方法执行以下操作,将kvConfig从文件加载到内存,实例化一个NettyRemotingServer,调用RemotingServer.registerDefaultProcess.(new DefaultRequestProcessor(this),this.remotingExecutor);注册请求处理器,这里分为两种情况:注册ClusterTestRequestProcessor或者DefaultRequestProcessor。实际情况下都是注册后者。紧接着启动两个线程池分别用于对broker进行心跳检测(10秒一次),在超过2分钟(private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;)没收到心跳信息的broker进行删除和周期性的打印kvConfig配置信息。最后一步调用controller.start();nameserver端netty启动。大致流程如下,省略了非关键代码:
final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876); final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); boolean initResult = controller.initialize(); controller.start();整理一下namesrv启动的调用链
namesrvStartup.main(){ namesrvStartup.main0(){ NamesrvController .initialize(){ kvConfigManager.load(); RemotingServer.registerDefaultProcess(); 跑两个线程; } NamesrvController .start(); }后续继续补充完善 阅读更多
相关文章推荐
- Android应用开发原理之从源码分析看Linearlayout、Relativelayout,Framelayout的布局差别(Framelayout分析)
- HashMap实现原理及源码分析
- Android底层原理之从Service_manager源码分析Android进程间通信过程
- 深入理解Spark 2.1 Core (十一):Shuffle Reduce 端的原理与源码分析
- cocos2d-x3.0中图片分辨率自动适配设置以及源码,原理分析。
- Spring源码解析之四 ------ AOP原理和源码分析
- 【mybatis源码分析】原理分析之三:初始化(配置文件读取和解析)
- softirq原理以及源码分析
- 深度理解Android InstantRun原理以及源码分析
- Java并发框架Disruptor实现原理与源码分析(一) 认识Disruptor
- hive原理与源码分析-物理执行计划与执行引擎(六)
- Tomcat7.0源码分析——请求原理分析(中)
- PHP strtotime函数用法、实现原理和源码分析
- 【Java集合学习系列】HashMap实现原理及源码分析
- Android AsyncTask运作原理和源码分析
- SharePreferences源码分析(commit与apply的区别以及原理)
- [Android实例] Scroll原理-附ScrollView源码分析 (转载)
- Java基于微信公众号接口实现授权登录源码及原理分析
- quartz2.2源码分析1-使用和原理
- Struts2原理及源码分析