您的位置:首页 > 其它

rocketmq学习笔记 五 源码之rocketmq-tools

2016-12-08 22:19 489 查看
前面分析了rocketmq-namesrv的源码

其实很简单,lock+map 把注册中心的事情做了。  因为nameSrv 之间不用保证数据一致性。 每个节点都是独立的

本文接着学习 rocketmq-tools, 这个包主要实现了对mqadmin的支持



1.admin

admin 也是一个client端

DefaultMQAdminExt 和 DefaultMQAdminExtImpl  封装了一些mqadmin命令实现



2.command

把命令行的操作转换成java类,基于commons-cli 来做的一些封装

/**
* @author shijia.wxr
*/
public interface SubCommand {
public String commandName();

public String commandDesc();

public Options buildCommandlineOptions(final Options options);

public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook);
}


MQAdminStartup

public static void initCommand() {
initCommand(new UpdateTopicSubCommand());
initCommand(new DeleteTopicSubCommand());
initCommand(new UpdateSubGroupSubCommand());
initCommand(new DeleteSubscriptionGroupCommand());
initCommand(new UpdateBrokerConfigSubCommand());
initCommand(new UpdateTopicPermSubCommand());

initCommand(new TopicRouteSubCommand());
initCommand(new TopicStatusSubCommand());
initCommand(new TopicClusterSubCommand());

initCommand(new BrokerStatusSubCommand());
initCommand(new QueryMsgByIdSubCommand());
initCommand(new QueryMsgByKeySubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new QueryMsgByOffsetSubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new PrintMessageSubCommand());
initCommand(new SendMsgStatusCommand());
initCommand(new BrokerConsumeStatsSubCommad());

initCommand(new ProducerConnectionSubCommand());
initCommand(new ConsumerConnectionSubCommand());
initCommand(new ConsumerProgressSubCommand());
initCommand(new ConsumerStatusSubCommand());
initCommand(new CloneGroupOffsetCommand());

initCommand(new ClusterListSubCommand());
initCommand(new TopicListSubCommand());

initCommand(new UpdateKvConfigCommand());
initCommand(new DeleteKvConfigCommand());

initCommand(new WipeWritePermSubCommand());
initCommand(new ResetOffsetByTimeCommand());

initCommand(new UpdateOrderConfCommand());
initCommand(new CleanExpiredCQSubCommand());
initCommand(new CleanUnusedTopicCommand());

initCommand(new StartMonitoringSubCommand());
initCommand(new StatsAllSubCommand());

initCommand(new SyncDocsToGithubSubCommand());
initCommand(new AllocateMQSubCommand());

initCommand(new CheckMsgSendRTCommand());
initCommand(new CLusterSendMsgRTCommand());

}




3.github

上传wiki文件

4.monitor 

public void start() throws MQClientException {
this.defaultMQPullConsumer.start();
this.defaultMQAdminExt.start();
this.defaultMQPushConsumer.start();
this.startScheduleTask();
}


private void startScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MonitorService.this.doMonitorWork();
} catch (Exception e) {
log.error("doMonitorWork Exception", e);
}
}
}, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS);
}

public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException {
long beginTime = System.currentTimeMillis();
this.monitorListener.beginRound();

TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());

try {
this.reportUndoneMsgs(consumerGroup);
} catch (Exception e) {
// log.error("reportUndoneMsgs Exception", e);
}

try {
this.reportConsumerRunningInfo(consumerGroup);
} catch (Exception e) {
// log.error("reportConsumerRunningInfo Exception", e);
}
}
}
this.monitorListener.endRound();
long spentTimeMills = System.currentTimeMillis() - beginTime;
log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills);

}

reportUndoneMsgs

private void reportUndoneMsgs(final String consumerGroup) {
ConsumeStats cs = null;
try {
cs = defaultMQAdminExt.examineConsumeStats(consumerGroup);
} catch (Exception e) {
return;
}

ConsumerConnection cc = null;
try {
cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
} catch (Exception e) {
return;
}

if (cs != null) {

HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String, ConsumeStats>();
{
Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, OffsetWrapper> next = it.next();
MessageQueue mq = next.getKey();
OffsetWrapper ow = next.getValue();
ConsumeStats csTmp = csByTopic.get(mq.getTopic());
if (null == csTmp) {
csTmp = new ConsumeStats();
csByTopic.put(mq.getTopic(), csTmp);
}

csTmp.getOffsetTable().put(mq, ow);
}
}

{
Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumeStats> next = it.next();
UndoneMsgs undoneMsgs = new UndoneMsgs();
undoneMsgs.setConsumerGroup(consumerGroup);
undoneMsgs.setTopic(next.getKey());
this.computeUndoneMsgs(undoneMsgs, next.getValue());
this.monitorListener.reportUndoneMsgs(undoneMsgs);
this.reportFailedMsgs(consumerGroup, next.getKey());
}
}
}
}



reportConsumerRunningInfo

public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
MQBrokerException, RemotingException, MQClientException {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
for (Connection c : cc.getConnectionSet()) {
String clientId = c.getClientId();

if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
continue;
}

try {
ConsumerRunningInfo info =
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
infoMap.put(clientId, info);
} catch (Exception e) {
}
}

if (!infoMap.isEmpty()) {
this.monitorListener.reportConsumerRunningInfo(infoMap);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: