您的位置:首页 > 其它

rockeMq-nameSrv原理源码分析

2018-05-06 13:27 113 查看

namesrv源码结构如下图


大家都知道rocketMq的nameSrv是做路由服务的,那他到底会维护那些路由信息呢?我们把这个问题搞清楚了,可能就能在整体上更好的把握他。路由信息都是保存在RouteInfoManager中的,如下

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
我们一个一个看,这些都是用来存储什么信息的:1、topicQueueTable,其实根据官方原代码的注释,很多东西我们已经能猜出来了,不过验证一下肯定更准确了,看下QueueData的定义
private String brokerName;  // broker的名称
private int readQueueNums;  // 读队列数量
private int writeQueueNums; // 写队列数量
private int perm;           // 读写权限
private int topicSynFlag;   // 同步复制还是异步复制标记
所以,现在我们可以知道topicQueueTable维护的就是形如摸个topic在哪些broker上,以及他们的读队列数量、写队列数量、读写权限、同步复制还是异步复制标记。2、brokerAddrTable,此hashMap的key存放的是brokerName,主要看下BrokerData
private String cluster;    //集群名称
private String brokerName; //broker名
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;  //brokerId对应的地址
所以,我们现在可以知道brokerAddrTable维护的就是形如某个brokerName所属的集群名称,以及他的主从broker的地址。3、clusterAddrTable,此hashMap的key是保存的是集群名称,value存的是他所包含的broker名的set集合4、brokerLiveTable,此hashMap的key存的是broker的地址,主要看下BrokerLiveInfo
private long lastUpdateTimestamp;   //最后一次心跳或更新的时间戳
private DataVersion dataVersion;    //()
private Channel channel;            //通信通道
private String haServerAddr;        //高可用地址,主从复制的端口
所以,我们可以知道brokerLiveTable是维护每个存活的broker的以上存活信息的5、filterServerTable维护的就是每个broker的文件过滤集合。

下面看看源码结构上各个部分是干嘛的:

KVConfigManager:
作用即加载namesrvController指定的kvConfig配置文件(常为xxx/kvConfig.json)到内存读取或增加,删除kvConfig记录将内存记录的配置,持久化到文件打印所有kvConfig配置。

方法 :构造函数------------传入一个NamesrvController,目的是为了后面获取到kvConfig的配置路径。load:加载配置文件,读取到内存的configTable中,步骤是根据kvConfigPath得到文件内容,以json格式解析得到KVConfigSerializeWrapper对象。putKVConfig:添加一条记录到HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable  (作用待补充)。deleteKVConfig:删除一条记录。persist:将内存记录的configTable持久化到配置文件。getKVListByNamespace:拿到configTable对应namespace的所有记录。getKVConfig:获取configTable中namespace,key对应的一条记录。printAllPeriodically:打印configTable所有配置,被周期性的调用。几乎所有的方法都用读写锁ReentrantReadWriteLock进行了同步,保证并发情况下的安全。对于load的方法的调用链  :NamesrvStartup.main()->NamesrvStartup.main0()->NamesrvStartup.main0().NamesrvController.initialize()→NamesrvController.KVConfigManager.load()。所以大致可以理解为,在启动namesrv的时候,通过层层调用链,最终将namesrv在文件的各项配置读取到内存最终。文件位置为:private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";,而putKVConfig和deleteKVConfig方法则是修改namesrv的配置文件,并持久化到文件中。
DefaultRequestProcessor:      DefaultRequestProcessor是NameServer的默认请求处理器,其中最重要得方法就是public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request),他处理了定义在rocketmq-common模块中RequestCode定义的部分请求,比如注册broker、注销broker、获取topic路由、删除topic、获取broker的topic权限、获取NameServer的所有topic等。
      例如接受到注册broker请求时的处理流程如下:调用该方法传入的request.getCode为RequestCode.REGISTER_BROKER,即代表该请求为注册broker。首先会获取发出请求的MQ的版本,版本号 >= V3_0_11与 < V3_0_11会调用不同的注册方法,例如我们正在使用的V3_5_8注册broker则调用registerBrokerWithFilterServer方法进行注册。大致流程是从requestHeader获取mq的ClusterName、brokerAddr、brokeName、brokerId,haServerAddr(master用来监听slave复制数据的端口)。从requestBody里获topictConfig、filterServerList(消息过滤服务列表)(待确认)。然后调用namesrvController.getRouteInfoManager().registerBroker方法。即向HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable里put一个broker信息。如果消息过滤服务列表不为空,则向HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable里put一条记录,filterServerTable.put(brokerAddr, filterServerList)。关于filter会在filtersrv里详细描述。注册完成后返回给Broker端主Broker的地址,以及该用Broker的haServerAddr地址。
ClusterTestRequestProcessor:      ClusterTestRequestProcessor继承DefaultRequestProcessor,仅重写了 getRouteInfoByTopic()方法。 在重写的方法中仅加入了一行代码:      topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());         就是在本机上获取不到topic的信息会尝试去其他的namesvr上获取,这正是namesrv分布式实现的一个体现。 跟入代码会发现最终是调用了NettyRemotingClient.invokeSync()  进而调用NettyRemotingAbstract.invokeSyncImpl(),使用netty通信,获取其他namesrv上的该topic信息。 NettyRemotingClient中的namesrvAddrList 存储着所有的namesrv的地址。
BrokerHousekeepingService:    这个接口会监听网络层的请求,如果有关闭连接,异常,空闲等请求,直接从brokerLiveTable中移除该数据;如果是正常保持连接的状态的请求,则什么也不做。从调用链来看,主要心跳检测时会发出这些请求。BrokerLiveInfo   broker存活状态,主要包括broker的版本号,channel,和最近心跳时间和haServerAddr。                                                                                                                                                       RouteInfoManager:主要维护以下路由信息
HashMap<String/* topic */, List<QueueData>> topicQueueTable;
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
以及一些我们常用的mq命令对应的方法:    例如执行命令 mqadmin clusterList -n 10.3.254.52:9876 就是查看集群信息,对应RouteInfoManager的
public byte[] getAllClusterInfo() {
ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
return clusterInfoSerializeWrapper.encode();
}
例如执行命令mqadmin deleteTopic -n 10.3.254.52:9876 -c StockMqCluster  -t MyTopic 就是删除指定topic信息。对应的RouteInfoManager的
public void deleteTopic(final String topic) {
try {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
}
}
分析到这里,namesrv的大致结构已经很清晰了,现在通过namesrvStartup和namesrvController将这个几个部分串联起来。      当我们启动rocketMq的nameserver时,实际调用了namesrvStartup.main(),进而调用了namesrvStartup.main0(),该方法会实例化一个NamesrvConfig并设置监听端口为9876和一个NettyServerConfig。 然后将namesrvConfig和nettyServerConfig 作为参数实例化一个NamesrvController 。     下一步通过调用controller.initialize()方法执行以下操作,将kvConfig从文件加载到内存,实例化一个NettyRemotingServer,调用RemotingServer.registerDefaultProcess.(new DefaultRequestProcessor(this),this.remotingExecutor);注册请求处理器,这里分为两种情况:注册ClusterTestRequestProcessor或者DefaultRequestProcessor。实际情况下都是注册后者。紧接着启动两个线程池分别用于对broker进行心跳检测(10秒一次),在超过2分钟(private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;)没收到心跳信息的broker进行删除和周期性的打印kvConfig配置信息。最后一步调用controller.start();nameserver端netty启动。大致流程如下,省略了非关键代码:
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
boolean initResult = controller.initialize();
controller.start();
整理一下namesrv启动的调用链
namesrvStartup.main(){
namesrvStartup.main0(){
NamesrvController .initialize(){
kvConfigManager.load();
RemotingServer.registerDefaultProcess();
跑两个线程;
}
NamesrvController .start();
}
后续继续补充完善 阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: