您的位置:首页 > 理论基础 > 数据结构算法

分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId

2017-04-16 16:49 991 查看
在前1篇RokcetMQ与Kafka架构差异一文中,我们已经讨论了2者在Topic的路由结构,也即topic/partition与物理机器的映射关系上的巨大差异。这个差异也是2者在架构上的一个巨大差异点,也是导致RocketMQ可以去除ZK依赖的一个重要原因。

本篇将接着这个话题,进一步从源码角度深度解析Topic的路由数据结构。至所以把“topic路由结构“作为源码分析的首篇,是因为这个话题串联了NameServer,
Broker, Producer, Consumer, Admin这5个角色。

彻底搞清楚topic的路由结构,将让我们对这几个角色之间的相互通信过程,有一个深刻理解,也从而对RocketMQ的整体架构有一个把握。


Admin端创建Topic的流程

要搞清楚topic的路由结构,我们首先要知道topic是如何被创建的。

在RocketMQ里面,topic的创建是在Admin管理端完成的,具体点,就是命令行工具。对应到源码里面,就是rocketmq-tools/UpdateTopicSubCommand.Java这个文件里面。

下面的图显示了创建1个topic的3个步骤:



第1步:GetMasterListByClusterName,也就是管理端向NameServer集群请求,获取该topic所在的cluster对应的Master的机器列表。

第2步:遍历这个Master列表,向每个Master发送UPDATE_AND_CREATE_TOPIC的命令

第3步:每个Master收到这个命令,向NameServer集群发送RegisterBroker请求。具体来讲,就是遍历所有NameServer,挨个向其发送RegisterBroker请求。

NameServer收到这些RegisterBroker请求,形成每个topic的路由结构,存放在RouteInfoManager里面。

下面看一下创建topic的源码:


步骤1

//UpdateTopicSubCommand.java
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
...
else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();

defaultMQAdminExt.start();

Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); //第1步:根据clusterName获取Master列表

for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);  //第2步:遍历所有Master,发送CreateTopic命令
System.out.printf("create topic to %s success.%n", addr);
}

...

}

public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);

RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis); //具体的,发送CreateTopic命令的地方
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}

throw new MQClientException(response.getCode(), response.getRemark());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[/code]


步骤2

接下来看一下,Master Broker收到这个命令,是如何处理的呢?
//broker里面, AdminBrokerProcessor专门处理Admin端发来的消息
public class AdminBrokerProcessor implements NettyRequestProcessor {

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
...
}

private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
...

this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);  //更新本地的topicConfig
this.brokerController.registerBrokerAll(false, true); //向NameServer发送registerBroker请求
return null;
}

public RegisterBrokerResult registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills) {
RegisterBrokerResult registerBrokerResult = null;

List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();  //遍历所有的NameServer,挨个向其发送RegisterBroker请求
if (nameServerAddressList != null) {
for (String namesrvAddr : nameServerAddressList) {
try {
RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
if (result != null) {
registerBrokerResult = result;
}

log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, " + namesrvAddr, e);
}
}
}

return registerBrokerResult;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[/code]


步骤3

下面就来看一下,NameServer是如何处理RegisterBroker请求的呢?
//NameSrv里面的DefaultRequestProcessor,处理所有请求
public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}

switch (request.getCode()) {
。。。
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}

//核心代码就在下面这个registerBroker里面,在这个里面,完成topic路由结构的创建
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

boolean registerFirst = false;

BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData();
brokerData.setBrokerName(brokerName);
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerData.setBrokerAddrs(brokerAddrs);

this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);

if (null != topicConfigWrapper //
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
|| registerFirst) {
ConcurrentHashMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}

BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
}

if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}

if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;

Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}

if (addNewOne) {
queueDataList.add(queueData);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
[/code]

上面的代码看起来很复杂,简单的讲其实就是:每来一个Master,创建一个QueueData对象。如果是新建topic,就是添加QueueData对象;如果是修改topic,就是把旧的QueueData删除,加入新的。

假设对于1个topic,有3个Master,NameSrv也就收到3个RegisterBroker请求。那对应的,该topic对应的QueueDataList里面,也就3个QueueData对象。


总结

关键点:每1个QueueData对象,对应1个Master机器!!!

下面就来深入分析一下topic的路由结构和这个QueueData.


NameServer上面TopicRoute的结构

在NameServer上有一个RouteInfoManger对象,这个对象维护了所有topic和所有机器的映射信息,也就是topic的路由结构。

这个结构,也是整个RocketMQ最核心的结构,下面就来深入剖析一下这个结构。
public class RouteInfoManager {
...
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;   //核心Map之1: topic到QueueData
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;  //核心Map之2: 物理机器信息,brokerName到Master/Slave机器列表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;  //核心Map之3: 物理机器信息,1个cluster多个broker

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;   //以后讲述

}

public class QueueData implements Comparable<QueueData> {
private String brokerName;
private int readQueueNums;
private int writeQueueNums;
private int perm;
private int topicSynFlag;
...
}

public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; //关键点:一个broker = 1个master + 多个slave,master的brokerId = 0, slave的brokerId  > 0
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[/code]

关于上面的数据结构,有以下几个核心要点:

(1) 每个broker有多台物理机器,具体来讲,就是1个Master + 多个Slave。对应上面的
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
1
1
[/code]

(2) 多个broker组成一个cluster。缺省就1个,defaultCluster。对应上面的
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;  //核心Map之3: 1个cluster多个broker
1
1
[/code]

关键点:这里的(1)(2)这2个数据结构是“静态的“,是在配置整个集群的时候,从配置文件中读取并确定的。

(3)一个topic有多个QueueData,每个QueueData有一个brokerName变量。也就是上面所的,每个master对应1个QueueData对象。
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;   //核心Map之1: topic到QueueData
1
1
[/code]

(1)和(3)这2个数据结构,正是通过brokerName进行关联。

不同于(1)(3)是“静态配置”的,(2)这个结构,就是上面说的,CreateTopic的时候,创建的。


关键点–queueId如何生成的?

分析上面的代码我们发现,QueueData里面并没有queueId这个变量。可是Producer在发送Message的时候,每个Message都有一个topic + queueId。

那这个queueID是怎么来的呢?queueID是全局的,还是每个master的queueID都是从编号0开始呢?

这个问题可以说是本篇文章的“精髓”,也就是topic/queue和Broker的映射关系。

下面就来通过Producer端的Message发送的代码,看一下queueID到底是如何生成的?


Producer端发送Message的流程

Producer端的代码经过了层层封装,此处我们省掉外层的部分,直抵最核心的消息发送前,路由选择的逻辑。
//DefaultMQProducerImpl
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());  //关键步骤1:拿到topic路由信息,并把topicRoute转化为topicPublishInfo
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);  //步骤2:从topicPublishInfo里面,选择一个MessageQueue,进行发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[/code]

注意,这里Producer端新进来一个MessageQueue对象,它和NameSrv上面的QueueData有什么区别呢?
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
...
}
1
2
3
4
5
6
7
1
2
3
4
5
6
7
[/code]

可以看到,此处queueId出来了!!!正是在上面的代码中,完成了QueueData到MessageQueue的转化。

下面就来看一下这个转换逻辑:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);  //从NameSrv拿到topicRoute信息,也就是上面NameSrv里面那个RouteInfoManager里面的map
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);  //关键函数:把topicRoute转换为topicPublishInfo
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[/code]
//关键函数:topicRouteData2TopicPublishInfo
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
...
} else {
List<QueueData> qds = route.getQueueDatas(); //遍历该topic的所有QueueData
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {   //找到对应的Master
brokerData = bd;
break;
}
}

if (null == brokerData) {
continue;
}

if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}

for (int i = 0; i < qd.getWriteQueueNums(); i++) { //遍历所有的writeQueueNums
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); //创建每1个MessageQueue
info.getMessageQueueList().add(mq);
}
}
}

info.setOrderTopic(false);
}

return info;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
[/code]


关键公式

通过上面代码,我们会发现一个关键性的公式:
topic的MessageQueue数目 =  topic的QueueData数目 * writeQueueNums = topic所在的cluster的Master个数 * writeQueueNums
1
1
[/code]

缺省,所有broker都在同1个cluster,DefaultCluster。

这也就回答了上面的问题: queueId其实是局部的,对于同1个topic,每个Master上面的queueId编号都是从0开始的。

假设一个topic有3个Master,writeQueueNums = 8。那也就意味着:每个Master上面都有8个queue,queueId为0 – 7。


总结

通过上面分析,我们也就在上一篇文章基础上,更深刻的知道了Kafka和RocketMQ在topic路由结构上的巨大差异。

Kafka: 1个topic多个partition,每个partition有1个Master + 多个Slave;



RocketMQ: 1个topic多个broker(1个Master + 多个slave),每个broker上面都有writeQueueNums个queue。



topic的queue个数,加总就不是writeQueueNums,而是writeQueueNums * master个数。

也正因为1个topic有多个Master,而不像Kafka那样,一个partition只有一个master,需要在master挂了之后,从slave中选举出master.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  分布式 rocketMQ
相关文章推荐