rocketmq学习笔记 五 源码之rocketmq-tools
2016-12-08 22:19
489 查看
前面分析了rocketmq-namesrv的源码
其实很简单,lock+map 把注册中心的事情做了。 因为nameSrv 之间不用保证数据一致性。 每个节点都是独立的
本文接着学习 rocketmq-tools, 这个包主要实现了对mqadmin的支持
admin 也是一个client端
DefaultMQAdminExt 和 DefaultMQAdminExtImpl 封装了一些mqadmin命令实现
把命令行的操作转换成java类,基于commons-cli 来做的一些封装
MQAdminStartup
上传wiki文件
其实很简单,lock+map 把注册中心的事情做了。 因为nameSrv 之间不用保证数据一致性。 每个节点都是独立的
本文接着学习 rocketmq-tools, 这个包主要实现了对mqadmin的支持
1.admin
admin 也是一个client端DefaultMQAdminExt 和 DefaultMQAdminExtImpl 封装了一些mqadmin命令实现
2.command
把命令行的操作转换成java类,基于commons-cli 来做的一些封装/** * @author shijia.wxr */ public interface SubCommand { public String commandName(); public String commandDesc(); public Options buildCommandlineOptions(final Options options); public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook); }
MQAdminStartup
public static void initCommand() { initCommand(new UpdateTopicSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); initCommand(new DeleteSubscriptionGroupCommand()); initCommand(new UpdateBrokerConfigSubCommand()); initCommand(new UpdateTopicPermSubCommand()); initCommand(new TopicRouteSubCommand()); initCommand(new TopicStatusSubCommand()); initCommand(new TopicClusterSubCommand()); initCommand(new BrokerStatusSubCommand()); initCommand(new QueryMsgByIdSubCommand()); initCommand(new QueryMsgByKeySubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new QueryMsgByOffsetSubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new PrintMessageSubCommand()); initCommand(new SendMsgStatusCommand()); initCommand(new BrokerConsumeStatsSubCommad()); initCommand(new ProducerConnectionSubCommand()); initCommand(new ConsumerConnectionSubCommand()); initCommand(new ConsumerProgressSubCommand()); initCommand(new ConsumerStatusSubCommand()); initCommand(new CloneGroupOffsetCommand()); initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); initCommand(new WipeWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new UpdateOrderConfCommand()); initCommand(new CleanExpiredCQSubCommand()); initCommand(new CleanUnusedTopicCommand()); initCommand(new StartMonitoringSubCommand()); initCommand(new StatsAllSubCommand()); initCommand(new SyncDocsToGithubSubCommand()); initCommand(new AllocateMQSubCommand()); initCommand(new CheckMsgSendRTCommand()); initCommand(new CLusterSendMsgRTCommand()); }
3.github
上传wiki文件4.monitor
public void start() throws MQClientException { this.defaultMQPullConsumer.start(); this.defaultMQAdminExt.start(); this.defaultMQPushConsumer.start(); this.startScheduleTask(); }
private void startScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MonitorService.this.doMonitorWork(); } catch (Exception e) { log.error("doMonitorWork Exception", e); } } }, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS); } public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException { long beginTime = System.currentTimeMillis(); this.monitorListener.beginRound(); TopicList topicList = defaultMQAdminExt.fetchAllTopicList(); for (String topic : topicList.getTopicList()) { if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); try { this.reportUndoneMsgs(consumerGroup); } catch (Exception e) { // log.error("reportUndoneMsgs Exception", e); } try { this.reportConsumerRunningInfo(consumerGroup); } catch (Exception e) { // log.error("reportConsumerRunningInfo Exception", e); } } } this.monitorListener.endRound(); long spentTimeMills = System.currentTimeMillis() - beginTime; log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills);
}
reportUndoneMsgs
private void reportUndoneMsgs(final String consumerGroup) { ConsumeStats cs = null; try { cs = defaultMQAdminExt.examineConsumeStats(consumerGroup); } catch (Exception e) { return; } ConsumerConnection cc = null; try { cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup); } catch (Exception e) { return; } if (cs != null) { HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String, ConsumeStats>(); { Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, OffsetWrapper> next = it.next(); MessageQueue mq = next.getKey(); OffsetWrapper ow = next.getValue(); ConsumeStats csTmp = csByTopic.get(mq.getTopic()); if (null == csTmp) { csTmp = new ConsumeStats(); csByTopic.put(mq.getTopic(), csTmp); } csTmp.getOffsetTable().put(mq, ow); } } { Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumeStats> next = it.next(); UndoneMsgs undoneMsgs = new UndoneMsgs(); undoneMsgs.setConsumerGroup(consumerGroup); undoneMsgs.setTopic(next.getKey()); this.computeUndoneMsgs(undoneMsgs, next.getValue()); this.monitorListener.reportUndoneMsgs(undoneMsgs); this.reportFailedMsgs(consumerGroup, next.getKey()); } } } }
reportConsumerRunningInfo
public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup); TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>(); for (Connection c : cc.getConnectionSet()) { String clientId = c.getClientId(); if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { continue; } try { ConsumerRunningInfo info = defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false); infoMap.put(clientId, info); } catch (Exception e) { } } if (!infoMap.isEmpty()) { this.monitorListener.reportConsumerRunningInfo(infoMap); } }
相关文章推荐
- rocketmq学习笔记 五 源码之rocketmq-filtersrv
- rocketmq学习笔记 五 源码之rocketmq-broker
- rocketmq学习笔记 六 流程之拉消息
- rocketmq学习笔记 六 流程之取消息
- RocketMQ 学习笔记
- rocketmq学习笔记 五 源码之rocketmq-remoting
- rocketmq学习笔记 四 rocketmq运行架构
- RocketMQ源码学习--消息存储篇
- (转)RocketMQ源码学习--消息存储篇
- rocketmq学习笔记 五 源码之rocketmq-namesrv
- rocketMq源码学习 -- rocketmq源码学习计划
- 【RocketMQ源码深度解析】整体介绍&IDE编译并启动RocketMQ的第一个例子
- RocketMQ 菜鸟笔记 (二) RocketMQ 4.1.0 安装与入门实例
- rocketmq学习笔记 一 hello world
- RocketMQ源码学习---网络通信篇
- rocketmq学习笔记 五 源码之rocketmq-store
- rocketmq学习笔记 二 官方实例<消息过滤>
- ASP.NET MVC学习笔记-MVC运行机制之源码剖析
- 学习笔记:解读CppUnit源码7
- jQuery源码学习笔记五(转)