Rocket MQ consumer 源码分析(绝对干货)
2017-05-04 12:17
1521 查看
在使用rocket mq 的时候,接触最多的还是consumer端。在实际使用的时候,踩过不少坑,如订阅关系不一致、广播消息重复消费等,因此,是时候对rocket mq的源码进行一下分析了。
consumer.subscribe(“TopicTest”, “*”);
实际上,源码里只是将订阅消息放到rebalanceImpl对象的subscriptionInner(map类型)里。该map的key为topic。因此,如果一个comsumer需要订阅多个topic,只需要连续调用subscribe。
例如:
consumer.subscribe(“TopicTest1”, “*”);
consumer.subscribe(“TopicTest2”, “*”);
MQClientInstance定期updateTopicRouteInfoFromNameServer,从nameserver获取broker信息。更新brokerAddrTable,更新updateTopicPublishInfo,最后更新topicRouteTable。
MQClientInstanc定期 persistConsumerOffset,从rebalanceImpl.getProcessQueueTable() 获取消费进度,然后执行offsetStore.persistAll()。其中集群消费为RemoteBrokerOffsetStore,他会同步消费进度到broker,广播消费为LocalFileOffsetStore,他只会将消费进度保存到隐藏本地文件中。
RebalanceService比较复杂。RebalanceService如果服务不停,周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将信息加入到阻塞队列pullRequestQueue。
PullMessageService 如果服务不停,不断的从阻塞队列pullRequestQueue获取消息,然后基于该信息从broker拉取消息。
2、MQClientInstance定期updateTopicRouteInfoFromNameServer(关键方法),从通过getTopicRouteInfoFromNameServer 从nameserver获取TopicRouteData信息。然后判断是否有变化,有变化则更新topicRouteTable 里的TopicRouteData
某个topic对应的TopicRouteData内容如下,主要包括queueDatas和brokerDatas:
3、通过topicRouteData2TopicSubscribeInfo 将topic 和TopicRouteData转换为Set《MessageQueue》,MessageQueue里维护topic、brokerName和queueId的信息,并同步到rebalanceImpl 的 topicSubscribeInfoTable里,如下:
4、更新brokerAddrTable
5、RebalanceService 依次调用RebalanceService doRebalance ->rebalanceByTopic ->findConsumerIdList ->updateTopicRouteInfoFromNameServer 触发上面流程。另一方面,RebalanceService周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。然后调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将PullRequest信息加入到阻塞队列pullRequestQueue
6、PullMessageService不断的从阻塞队列pullRequestQueue获取消息。然后依次调用mQClientFactory.getMQClientAPIImpl().pullMessage和pullAPIWrapper.pullKernelImp**l,根据message queue和offset拉取消息,成功后通过, 异步回调 **PullCallback.onSuccess,返回pullResult,里面包括获取的msgFoundList。然后依次调用pullAPIWrapper.processPullResult和consumeMessageService.submitConsumeRequest,构造ConsumeRequest对象并提交到consumeExecutor线程池上。
7、在consumeExecutor线程池上,ConsumeRequest类的run方法从processQueue获取待处理的消息
并调用listener.consumeMessage(用户注册的handler)消费消息。整个消费流程完成。
一、消息订阅
当consumer订阅topic时,例如consumer.subscribe(“TopicTest”, “*”);
实际上,源码里只是将订阅消息放到rebalanceImpl对象的subscriptionInner(map类型)里。该map的key为topic。因此,如果一个comsumer需要订阅多个topic,只需要连续调用subscribe。
例如:
consumer.subscribe(“TopicTest1”, “*”);
consumer.subscribe(“TopicTest2”, “*”);
二、主要线程分析
MQClientInstance 定期fetchNameServerAddr,通过http请求获取nameserver的最新地址并保存。MQClientInstance定期updateTopicRouteInfoFromNameServer,从nameserver获取broker信息。更新brokerAddrTable,更新updateTopicPublishInfo,最后更新topicRouteTable。
MQClientInstanc定期 persistConsumerOffset,从rebalanceImpl.getProcessQueueTable() 获取消费进度,然后执行offsetStore.persistAll()。其中集群消费为RemoteBrokerOffsetStore,他会同步消费进度到broker,广播消费为LocalFileOffsetStore,他只会将消费进度保存到隐藏本地文件中。
RebalanceService比较复杂。RebalanceService如果服务不停,周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将信息加入到阻塞队列pullRequestQueue。
PullMessageService 如果服务不停,不断的从阻塞队列pullRequestQueue获取消息,然后基于该信息从broker拉取消息。
三、消息消费流程分析
1、MQClientInstance 定期fetchNameServerAddr,通过http请求获取nameserver的最新地址并保存。2、MQClientInstance定期updateTopicRouteInfoFromNameServer(关键方法),从通过getTopicRouteInfoFromNameServer 从nameserver获取TopicRouteData信息。然后判断是否有变化,有变化则更新topicRouteTable 里的TopicRouteData
某个topic对应的TopicRouteData内容如下,主要包括queueDatas和brokerDatas:
TopicRouteData [orderTopicConf=null, queueDatas=[QueueData [brokerName=taobaodaily-01, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0], QueueData [brokerName=taobaodaily-03, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]], brokerDatas=[BrokerData [brokerName=taobaodaily-03, brokerAddrs={0=100.81.165.119:10911}], BrokerData [brokerName=taobaodaily-01, brokerAddrs={0=10.218.141.54:10911}]], filterServerTable={}]
3、通过topicRouteData2TopicSubscribeInfo 将topic 和TopicRouteData转换为Set《MessageQueue》,MessageQueue里维护topic、brokerName和queueId的信息,并同步到rebalanceImpl 的 topicSubscribeInfoTable里,如下:
[MessageQueue [topic=TBW102, brokerName=taobaodaily-03, queueId=0]
4、更新brokerAddrTable
{taobaodaily-01={0=10.218.141.54:10911}, taobaodaily-03={0=100.81.165.119:10911}}
5、RebalanceService 依次调用RebalanceService doRebalance ->rebalanceByTopic ->findConsumerIdList ->updateTopicRouteInfoFromNameServer 触发上面流程。另一方面,RebalanceService周期的doRebalance –> rebalanceByTopic -> 从topicSubscribeInfoTable查找topic对应得queue,根据mQClientFactory.findConsumerIdList 获取comsumer list,并负载均衡。然后updateProcessQueueTableInRebalance。然后调用computePullFromWhere 获取offset,最后调用dispatchPullRequest,将PullRequest信息加入到阻塞队列pullRequestQueue
6、PullMessageService不断的从阻塞队列pullRequestQueue获取消息。然后依次调用mQClientFactory.getMQClientAPIImpl().pullMessage和pullAPIWrapper.pullKernelImp**l,根据message queue和offset拉取消息,成功后通过, 异步回调 **PullCallback.onSuccess,返回pullResult,里面包括获取的msgFoundList。然后依次调用pullAPIWrapper.processPullResult和consumeMessageService.submitConsumeRequest,构造ConsumeRequest对象并提交到consumeExecutor线程池上。
7、在consumeExecutor线程池上,ConsumeRequest类的run方法从processQueue获取待处理的消息
并调用listener.consumeMessage(用户注册的handler)消费消息。整个消费流程完成。
相关文章推荐
- 分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- RocketMQ源码 — 四、 Consumer 接收消息过程
- RocketMQ源码分析----Broker处理发送请求
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- 源码分析RocketMQ之消息ACK机制(消费进度)
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- RocketMQ源码:Producer启动分析
- 查看RocketMQ的broker启动部分源码分析总结
- 源码分析RocketMQ之消息消费重试机制
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- 源码分析RocketMQ文件清除机制
- 源码分析RocketMQ之CommitLog消息存储机制
- 源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-上篇
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- RocketMQ源码深度解析五之Consumer篇
- RocketMQ源码解析-Consumer启动(2)
- RocketMQ源码:有序消息分析