基于Docker构建RocketMQ4.4.0集群,以及RocketMQ的一些基本使用
文章目录
RocketMQ
docket环境下安装单机RocketMQ
-
#拉取rocketMQ镜像 docker pull rocketmqinc/rocketmq:4.4.0
-
运行rocketMQ
首先运行NameServer
docker run -d --restart=always -p 9876:9876 -v /home/rocketMQ/data/namesrv/logs:/root/logs -v /home/rocketMQ/data/namesrv/store:/root/store --name RocketMQNSrv -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
-
运行broker服务器
#1.首先进入/home/rocketMQ/conf目录下创建broker.conf 文件 cd /home/rocketMQ mkdir conf cd conf vi broker.conf #2.往conf文件中写入以下内容 brokerClusterName = DefaultCluster #broker集群名 brokerName = broker-a #当前broker节点名 brokerId = 0 #broker角色,0-master,>0-slave deleteWhen = 04 #每天消息清理时机:默认凌晨4点 fileReservedTime = 48 #文件保留时长,默认72小时 brokerRole = ASYNC_MASTER #Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE flushDiskType = ASYNC_FLUSH #刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘 brokerIP1 = {本地外网 IP} #docker中需要指定宿主机的IP #3.启动broker服务器 docker run -d --restart=always -p 10911:10911 -p 10909:10909 -v /home/rocketMQ/data/broker/logs:/root/logs -v /home/rocketMQ/data/broker/store:/root/store -v /home/rocketMQ/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name RocketMQBroker --link RocketMQNSrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
-
安装 rocketmq 控制台
#1.拉取镜像 docker pull pangliang/rocketmq-console-ng #2.运行 docker run -d --restart=always -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.56.101:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 -t pangliang/rocketmq-console-ng
Drocketmq.namesrv.addr=192.168.56.101:9876
如果有多个nameServer则分号分隔,如192.168.56.101:9876;192.168.56.3:9876
broker.conf配置详解
更多配置信息可以参考:
#所属集群名字 borkerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示Master, >0 表示Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许Broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许Broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10915 #删除文件时间点,默认凌晨4点 deleteWhen=04 #文件保留时间,默认48小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/middle/rocketmq/data #commitLog存储路径 storePathCommitLog=/usr/local/middle/rocketmq/data/commitlog #消费队列存储路径 storePathConsumeQueue=/usr/local/middle/rocketmq/data/consumequeue #消息索引存储路径 storePathIndex=/usr/local/middle/rocketmq/data/index #checkpoint 文件存储路径 storeCheckPoint=/usr/local/middle/rocketmq/data/checkpoint #abort 文件存储路径 abortFile=/usr/local/middle/rocketmq/data/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #开启属性过滤 enablePropertyFilter=true #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128 #docker中需要指定宿主机的IP brokerIP1={本地外网 IP}
docker环境下rocketMQ集群
此处使用2个nameServer,2个master2个slave
服务器1:master1、slave2
服务器2:master2、slave1
集群主要是broker集群,每个broker映射一个
broker.conf文件
rocke有9876
非vip通道端口:10911
vip通道端口:10909
10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口#关闭VIP通道 rocketmq.config.isVIPChannel=false如果是broker集群的话,还要开放10912,否则master的消息将无法复制到slave节点,会报SLAVE_NOT_AVAILABLE异常
安装nameServer
同上面单机装配一样
服务器1的配置文件
broker-master1.conf
# nameServer地址,如果nameserver是多台集群的话,就用分号分割 namesrvAddr=192.168.56.101:9876;192.168.56.3:9876 # 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=0 # Broker 对外服务的监听端口 listenPort=10911 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4 # defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错 diskMaxUsedSpaceRatio=90 # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE brokerRole=SYNC_MASTER # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #开启属性过滤 enablePropertyFilter=true #docker中需要指定宿主机的IP brokerIP1=192.168.56.101 brokerIP2=192.168.56.101
#运行master1 docker run -d --restart=always -p 10911:10911 -p 10909:10909 -p 10912:10912 -v /home/rocketMQ/data/broker/logs:/root/logs -v /home/rocketMQ/data/broker/store:/root/store -v /home/rocketMQ/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name RocketMQBroker -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
broker-slave2.conf
# nameServer地址,如果nameserver是多台集群的话,就用分号分割 namesrvAddr=192.168.56.101:9876;192.168.56.3:9876 # 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字(同一主从下:Master和slave名称要一致),注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b #*****注意,主从一致 brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=1 # Broker 对外服务的监听端口 listenPort=20911 # Master监听端口,从服务器连接该端口,默认为10912 haListenPort=20912 # VIP通道端口 fastListenPort=20909 # Master服务器IP地址与端口号(指定HA连接的master地址) haMasterAddress=192.168.56.3:10912 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4 # defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错 diskMaxUsedSpaceRatio=90 # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE brokerRole=SLAVE # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #开启属性过滤 enablePropertyFilter=true #docker中需要指定宿主机的IP brokerIP1=192.168.56.101
*注:docker中必须指定
haMasterAddress或者在master中指定brokerIP2,否则发送消息可能会报SLAVE_NOT_AVAILABLE(不知道大家是不是也会跟我一样有这种情况,可能也只是个例),这边因为是双主机双主双从,所以修改端口号防止冲突Master监听端口,从服务器连接该端口,默认为10912
haListenPort=20912VIP通道端口,默认10909
fastListenPort=20909Master服务器IP地址与端口号(指定HA连接的master地址)
haMasterAddress=192.168.56.101:10912
#运行slave2 docker run -d --restart=always -p 20911:20911 -p 20909:20909 -p 20912:20912 -v /home/rocketMQ/data/broker-slave2/logs:/root/logs -v /home/rocketMQ/data/broker-slave2/store:/root/store -v /home/rocketMQ/conf/broker-slave2.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name Broker-slave2 -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
服务器2的配置文件
broker-master2.conf
# nameServer地址,如果nameserver是多台集群的话,就用分号分割 namesrvAddr=192.168.56.101:9876;192.168.56.3:9876 # 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b brokerName=broker-b # 0 表示 Master,>0 表示 Slave brokerId=0 # Broker 对外服务的监听端口 listenPort=10911 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4 # defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错 diskMaxUsedSpaceRatio=90 # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE brokerRole=SYNC_MASTER # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #开启属性过滤 enablePropertyFilter=true #docker中需要指定宿主机的IP brokerIP1=192.168.56.3 brokerIP2=192.168.56.3
#运行master2 docker run -d --restart=always -p 10911:10911 -p 10909:10909 -p 10912:10912 -v /home/rocketMQ/data/broker/logs:/root/logs -v /home/rocketMQ/data/broker/store:/root/store -v /home/rocketMQ/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name RocketMQBroker -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
broker-slave1.conf
# nameServer地址,如果nameserver是多台集群的话,就用分号分割 namesrvAddr=192.168.56.101:9876;192.168.56.3:9876 # 所属集群名字 brokerClusterName=rocketmq-cluster # broker名字(同一主从下:Master和slave名称要一致),注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b #*****注意,主从一致 brokerName=broker-a # 0 表示 Master,>0 表示 Slave brokerId=1 # Broker 对外服务的监听端口 listenPort=20911 # Master监听端口,从服务器连接该端口,默认为10912 haListenPort=20912 # VIP通道端口 fastListenPort=20909 # Master服务器IP地址与端口号(指定HA连接的master地址) haMasterAddress=192.168.56.101:10912 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4 # defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错 diskMaxUsedSpaceRatio=90 # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE brokerRole=SLAVE # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #开启属性过滤 enablePropertyFilter=true #docker中需要指定宿主机的IP brokerIP1=192.168.56.3
*注:同broker-slave2注释
Master监听端口,从服务器连接该端口,默认为10912
haListenPort=20912VIP通道端口,默认10909
fastListenPort=20909Master服务器IP地址与端口号(指定HA连接的master地址)
haMasterAddress=192.168.56.101:10912
#运行slave1 docker run -d --restart=always -p 20911:20911 -p 20909:20909 -p 20912:20912 -v /home/rocketMQ/data/broker-slave1/logs:/root/logs -v /home/rocketMQ/data/broker-slave1/store:/root/store -v /home/rocketMQ/conf/broker-slave1.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name Broker-slave1 -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
RocketMQ基本使用
消息发送步骤
- 创建生产者producer,并指定生产者组名
- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
- 启动producer
- 创建消息对象,指定主题topic,tag和消息体
- 发送消息
- 关闭producer
消息消费步骤
- 创建消费者consumer,指定消费组名
- 指定NameServer
- 订阅主题topic和tag
- 注册消息监听器,设置回调函数,处理消息
- 启动消费者consumer
首先导入maven依赖(请自行选择对应的版本)
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
消息发送结果
- SEND_OK:消息发送成功
- FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
1. 基础消息
1.1 发送消息
1.1.1 同步消息
需要接到消息结果之后再发送下一个消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
producer.send(message);
public static void main(String[] args) throws Exception { //1.- 创建生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group-1"); //2.- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅) producer.setNamesrvAddr("192.168.56.101:9876"); //3.- 启动producer producer.start(); //4.- 创建消息对象,指定主题topic,tag和消息体 for (int i = 0; i < 10; i++) { /** * 参数一:消息主题 topic * 参数二:消息tag * 参数三:消息内容 */ Message message = new Message("base", "tag_sync", ("hello musha" + i).getBytes()); //5.- 发送消息 SendResult result = producer.send(message); //发送状态 SendStatus sendStatus = result.getSendStatus(); //消息ID String msgId = result.getMsgId(); //消息队列ID int queueId = result.getMessageQueue().getQueueId(); // System.out.println("发送状态:" + sendStatus); // System.out.println("消息ID:" + msgId); // System.out.println("消息队列ID:" + queueId); System.out.println("发送结果:" + result); //线程sleep 1s TimeUnit.SECONDS.sleep(1); } //6.- 关闭producer producer.shutdown(); }
1.1.2 异步消息
不必等待返回结果,立即发送下一个消息,可以通过
send(Message msg, SendCallback sendCallback)中的回调函数,对返回结果进行处理。异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
producer.send(message, new SendCallback())
public static void main(String[] args) throws Exception { //1.- 创建生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group-1"); //2.- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅) producer.setNamesrvAddr("192.168.56.101:9876"); //3.- 启动producer producer.start(); //4.- 创建消息对象,指定主题topic,tag和消息体 for (int i = 0; i < 10; i++) { /** * 参数一:消息主题 topic * 参数二:消息tag * 参数三:消息内容 */ Message message = new Message("base", "tag_async", ("hello musha" + i).getBytes()); //5.- 发送消息 *主要是这里不一样 producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //发送状态 //消息ID String msgId = sendResult.getMsgId(); //消息队列ID // int queueId = sendResult.getMessageQueue().getQueueId(); System.out.println("发送结果:" + sendResult); } @Override public void onException(Throwable throwable) { } }); //线程sleep 1s TimeUnit.SECONDS.sleep(1); } //6.- 关闭producer producer.shutdown(); }
1.1.3 单向消息
通俗来说,就是发送消息不必等待返回结果,也无需执行回调函数。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
producer.sendOneway(message);
public static void main(String[] args) throws Exception { //1.- 创建生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group-1"); //2.- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅) producer.setNamesrvAddr("192.168.56.101:9876"); //3.- 启动producer producer.start(); //4.- 创建消息对象,指定主题topic,tag和消息体 for (int i = 0; i < 10; i++) { /** * 参数一:消息主题 topic * 参数二:消息tag * 参数三:消息内容 */ Message message = new Message("base", "tag_oneway", ("hello musha" + i).getBytes()); //5.- 发送消息 *主要是这里不一样 producer.sendOneway(message); //线程sleep 1s TimeUnit.SECONDS.sleep(1); } //6.- 关闭producer producer.shutdown(); }
1.2 消费消息
1.2.1 负载均衡
默认即为负载均衡
//若不进行该属性设置,默认也是负载均衡策略 consumer.setMessageModel(MessageModel.CLUSTERING);
public static void main(String[] args) throws Exception { //1- 创建消费者consumer,指定消费组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2- 指定NameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3- 订阅主题topic和tag // 参数一:主题topic // 参数二:tag consumer.subscribe("base", "tag_sync || tag_async"); //**设置消费模式-负载均衡 consumer.setMessageModel(MessageModel.CLUSTERING); //4- 设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(msg -> { System.out.println(new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5- 启动消费者consumer consumer.start(); }
1.2.2 广播消费
一个消息,所有的消费者都能进行消费
consumer.setMessageModel(MessageModel.BROADCASTING);
public static void main(String[] args) throws Exception { //1- 创建消费者consumer,指定消费组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2- 指定NameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3- 订阅主题topic和tag // 参数一:主题topic // 参数二:tag consumer.subscribe("base", "tag_sync || tag_async"); //**设置消费模式 - 广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //4- 设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(msg -> { System.out.println(new String(msg.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5- 启动消费者consumer consumer.start(); }
2. 顺序消息
由于目前部署了双主双从模式
发送消息时,会创建一个topic在2个broker上,每个broker各自创建4个队列。
消息并不会发送至同一个队列中,会导致无法保证顺序消费(如果需要顺序消费的话)。
保证消息的顺序性
全局顺序消息
一般通过建立单队列的方式,消息全在一个队列里,保证消息的顺序性(FIFO)
局部顺序消息
不需要限定单个队列,将需要按序消费的消息,放入到同一个队列中即可。
比如:顺序消息A1,B2,C3,D4,都放入到队列1中,就能保证消息的顺序性。
2.1 顺序消费的实体类
/** * @ClassName OrderStep * @Desc 顺序消息-数据类 * @Author musha * @Date 2020/3/15 13:49 * @Version 1.0 */ @Data @AllArgsConstructor public class OrderStep { private String orderId; private String desc; /** * @param * @Description 用来创建订单集合 * @return: java.util.List<com.musha.mq.rocket.demo.order.OrderStep> * @Author musha * @Date 2020/3/15 13:51 */ public static List<OrderStep> getOrders() { List<OrderStep> orderList = new ArrayList<>(); //创建订单ID,此处模拟3个订单 // 订单A: 创建->付款->物流->完成 // 订单B: 创建->付款 // 订单C: 创建->付款->物流 String orderIdA = UUID.randomUUID().toString().substring(0, 10); String orderIdB = UUID.randomUUID().toString().substring(0, 10); String orderIdC = UUID.randomUUID().toString().substring(0, 10); orderList.add(new OrderStep(orderIdA, "A创建订单")); orderList.add(new OrderStep(orderIdB, "B创建订单")); orderList.add(new OrderStep(orderIdA, "A付款")); orderList.add(new OrderStep(orderIdC, "C创建订单")); orderList.add(new OrderStep(orderIdC, "C付款")); orderList.add(new OrderStep(orderIdA, "A物流")); orderList.add(new OrderStep(orderIdB, "B付款")); orderList.add(new OrderStep(orderIdA, "A完成")); orderList.add(new OrderStep(orderIdC, "C物流")); return orderList; } }
2.2 生产者
需要将同一个业务标识(同一个订单号)订单的消息,放置到同一个队列中,保证其有序性
send(Message msg, MessageQueueSelector selector, Object arg)msg:消息对象
selector:消息队列的选择器,主要在这里进行消息队列的选择
arg:业务标识(这里为订单号)
select(List<MessageQueue> list, Message message, Object o)@param list 消息队列
@param message 消息对象(即上面传递过来的message
@param o 业务标识的参数(即传递过来的orderId)
/** * @ClassName OrderProducer * @Desc 顺序消费 - 生产者 * @Author musha * @Date 2020/3/15 14:01 * @Version 1.0 */ public class OrderProducer { public static void main(String[] args) throws Exception { //1.创建消息生产者,指定组名 DefaultMQProducer producer = new DefaultMQProducer("orderMQ"); //2.指定NameServer producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.启动producer producer.start(); //4.获取订单集合信息 List<OrderStep> orderList = OrderStep.getOrders(); //发送消息 int i = 0; for (OrderStep orderStep : orderList) { //5.创建消息 Message message = new Message("OrderTopic", "OrderTag", "i:" + i++, (orderStep + "").getBytes()); //6.发送消息 // 参数一:消息对象 // 参数二:消息队列的选择器 // 参数三:选择队列的业务标识(此处为订单ID) // 其实就是:使用业务标识,进行一定的规则,选出该标识对应存储的队列 SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * @param list 消息队列 * @param message 消息对象(即上面传递过来的message) * @param o 业务标识的参数(即传递过来的orderId) * @return: org.apache.rocketmq.common.message.MessageQueue */ @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { String orderId = (String) o; //计算当前id存储的队列索引(取绝对值,因为hashCode可能为负数),这里ID只是做测试用,正式情况请 int index = Math.abs(orderId.hashCode()) % (list.size() - 1); return list.get(index); } }, orderStep.getOrderId()); System.out.println("发送结果:" + sendResult); } producer.shutdown(); } }
2.3 消费者
consumer注册消息监听器
MessageListenerOrderlyMessageListenerOrderly:使用该监听器时,只启用一个线程对一个队列进行消费即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费)
*同一个topic下的不同消息队列可以被并发消费
/** * @ClassName OrderConsumer * @Desc 顺序消息 - 消费者 * @Author musha * @Date 2020/3/15 14:22 * @Version 1.0 */ public class OrderConsumer { public static void main(String[] args) throws Exception { //1.创建消费者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderMQ"); //2.指定nameServer地址 consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.订阅主题topic consumer.subscribe("OrderTopic", "OrderTag"); //4.注册消息监听器 // 顺序消息监听 - MessageListenerOrderly 使用该监听器时,只启用一个线程对一个队列进行消费 // 即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费) consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { list.forEach(msg -> { System.out.println(Thread.currentThread().getName() + "\t" + LocalDateTime.now() + "\t\t消费消息:" + new String(msg.getBody())); }); return ConsumeOrderlyStatus.SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } }
3. 延时消息
在message中设置延时时间
message.setDelayTimeLevel(3);延时等级分为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
从等级1开始
3.1 生产者
/** * @ClassName DelayProducer * @Desc 延时消息 - 生产者 * @Author musha * @Date 2020/3/15 15:42 * @Version 1.0 */ public class DelayProducer { public static void main(String[] args) throws Exception { //1.创建生产者,指定组 DefaultMQProducer producer = new DefaultMQProducer("gourp_delay"); //2.设置nameServer producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.启动producer producer.start(); for (int i = 0; i < 10; i++) { //4.创建消息 Message message = new Message("topic_delay", "tag_delay", "key_delay" + i, "musha真帅".getBytes()); //5.==*延时消息*== 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h // 3对应的10s message.setDelayTimeLevel(3); //6.发送消息 SendResult sendResult = producer.send(message); System.out.println("发送结果:" + sendResult); } producer.shutdown(); } }
3.2 消费者
/** * @ClassName DelayConsumer * @Desc 延时消息 - 消费者 * @Author musha * @Date 2020/3/15 15:42 * @Version 1.0 */ public class DelayConsumer { public static void main(String[] args) throws Exception { //1.创建消费者,指定组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_delay"); //2.设置nameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.订阅主题 consumer.subscribe("topic_delay", "tag_delay"); //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(messageExt -> { //getStoreTimestamp()获取消息入队的时间 System.out.println("消息ID:" + messageExt.getMsgId() + "\t延时时间:" + (System.currentTimeMillis() - messageExt.getStoreTimestamp()) + "\t消费消息:" + new String(messageExt.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } }
4. 批量消息
前面发送的消息都是一条一条发送,批量消息则是一次发送多条消息,这一批消息总大小(无论是单条消息还是消息总大小)不应该超过4MB(默认大小)
批量消息一般有两种使用情况:
- 达到一定消息数量之后发送
- 一定时间发送(比如3s发送一次)
4.1 生产者
主要就是创建一个消息集合,然后发送消息集合
List<Message> messageList = new ArrayList<>();SendResult sendResult = producer.send(messageList);
/** * @ClassName BatchProducer * @Desc 批量消息 - 生产者 * @Author musha * @Date 2020/3/16 10:57 * @Version 1.0 */ public class BatchProducer { public static void main(String[] args) throws Exception { //1.创建生产者,指定组 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.绑定nameServer producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.启动生产者 producer.start(); //4.创建消息集合 List<Message> messageList = new ArrayList<>(); messageList.add(new Message("topic_batch", "tag_batch", "musha真帅1".getBytes())); messageList.add(new Message("topic_batch", "tag_batch", "musha真帅2".getBytes())); messageList.add(new Message("topic_batch", "tag_batch", "musha真帅3".getBytes())); //5.发送消息 SendResult sendResult = producer.send(messageList); System.out.println("发送结果:" + sendResult); //6.关闭消费者 producer.shutdown(); } }
4.2 消费者
与正常消费一致
/** * @ClassName BatchConsumer * @Desc 批量消息 - 消费者 * @Author musha * @Date 2020/3/16 11:17 * @Version 1.0 */ public class BatchConsumer { public static void main(String[] args) throws Exception { //1.创建消费者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设置nameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.订阅消息 consumer.subscribe("topic_batch", "tag_batch"); //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(messageExt -> { System.out.println("消费消息:" + new String(messageExt.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } }
4.3 消息分割
如果发送的消息集合超过了4MB,则需要对消息进行分割,rocketmq建议每次批量消息大小大概在1MB。
如果超过4MB,会出现如下异常:
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 13 DESC: the message body size over max value, MAX: 4194304
4.3.1 生产者
主要就是创建一个消息分割器类
MessageSplitter,该类需要实现迭代器接口Iterator然后将原本的消息集合进行迭代生成不超出规定大小的子集合,分批次进行批量消息的发送
/** * @ClassName BatchProducer * @Desc 批量消息 - 生产者 * @Author musha * @Date 2020/3/16 10:57 * @Version 1.0 */ public class BatchProducer { public static void main(String[] args) throws Exception { //1.创建生产者,指定组 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.绑定nameServer producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.启动生产者 producer.start(); //4.创建消息集合 List<Message> messageList = new ArrayList<>(); //每个byte数组为0.5MB byte[] bytes = new byte[(int) (1024 * 1024 * 0.2)]; for (int i = 0; i < 20; i++) { byte[] resourceBytes = ("musha太帅了").getBytes(); //arraycopy实现数组之间的复制 /** * @param src the source array. 源数组 * @param srcPos starting position in the source array. 源数组的起始位置 * @param dest the destination array. 目标数组 * @param destPos starting position in the destination data. 目标数组的起始位置 * @param length the number of array elements to be copied. 复制的长度 */ System.arraycopy(resourceBytes, 0, bytes, 0, resourceBytes.length); messageList.add(new Message("topic_batch", "tag_batch_" + i, bytes)); } MessageSplitter messageSplitter = new MessageSplitter(messageList); int count = 0; while (messageSplitter.hasNext()) { List<Message> list = messageSplitter.next(); //5.发送消息 SendResult sendResult = producer.send(list); System.out.println(++count + "次发送结果:" + sendResult); } //6.关闭消费者 producer.shutdown(); } } /** * @Description 消息分割器 * @Author musha * @Date 2020/3/16 21:43 */ class MessageSplitter implements Iterator<List<Message>> { //消息大小控制在1MB private final int SIZE_LIMIT = 1024 * 1024 * 1; //消息集合 private final List<Message> messageList; //当前索引(游标) private int currentIndex; public MessageSplitter(List<Message> messageList) { this.messageList = messageList; } /** * @Description 当前迭代器中是否还有元素 * @return: boolean * @Author musha * @Date 2020/3/16 22:00 */ @Override public boolean hasNext() { return this.currentIndex < this.messageList.size(); } /** * @Description 对消息集合进行拆分,并返回子集合 * @return: java.util.List<org.apache.rocketmq.common.message.Message> * @Author musha * @Date 2020/3/16 22:03 */ @Override public List<Message> next() { int nextIndex = this.currentIndex; int totalSize = 0; //遍历消息进行拆分 for (; nextIndex < this.messageList.size(); nextIndex++) { //获取消息 Message message = messageList.get(nextIndex); //获取消息大小 int tmpSize = message.getTopic().length() + message.getBody().length; //遍历msg中的属性,并将属性和属性值大小计算到消息大小中 for (Map.Entry<String, String> entry : message.getProperties().entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } //增加日志的开销20字节 tmpSize += 20; //如果当前消息大小超过了限制大小 if (tmpSize > SIZE_LIMIT) { //nextIndex - currentIndex == 0代表本次遍历刚开始,子集合(subList)中没有值,则将当前消息添加到子集合中,然后结束循环 //如果不等于0,则表示当前子集合不为空,则将该消息留到下一个子集合中放置 if (nextIndex - currentIndex == 0) { nextIndex++; } //结束循环 break; } //如果当前已经计算的消息集合大小+本次遍历的消息超出了限制大小,则直接结束循环 if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } //截取出未超出限制大小的子集合 List<Message> subList = this.messageList.subList(currentIndex, nextIndex); //更新游标 currentIndex = nextIndex; return subList; } }
4.3.2 消费者
依旧没什么区别
/** * @ClassName BatchConsumer * @Desc 批量消息 - 消费者 * @Author musha * @Date 2020/3/16 11:17 * @Version 1.0 */ public class BatchConsumer { public static void main(String[] args) throws Exception { //1.创建消费者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设置nameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.订阅消息 consumer.subscribe("topic_batch", "*"); //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(messageExt -> { System.out.println("消费消息:" + messageExt.getTags() + "\t" + new String(messageExt.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } }
5. 过滤消息
消费时进行消息筛选
5.1 生产者
使用发送的消息tag进行区分,用于消费者tag过滤
message可以使用putUserProperty进行自定义属性绑定,供消费者筛选
message.putUserProperty("i", i + "");
/** * @ClassName FilterProducer * @Desc 过滤消息 - 生产者 * @Author musha * @Date 2020/3/16 16:18 * @Version 1.0 */ public class FilterProducer { public static void main(String[] args) throws Exception { //1.创建生产者,指定组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设置nameServer producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.启动生产者 producer.start(); for (int i = 0; i < 10; i++) { //4.创建消息 Message message = new Message("topic_filter", "tag_filter_" + i, ("musha真帅" + i).getBytes()); //**绑定自定义属性** message.putUserProperty("i", i + ""); //5.发送消息 SendResult sendResult = producer.send(message); System.out.println("发送结果:" + sendResult); } //6.关闭生产者 producer.shutdown(); } }
5.2 消费者
- tag过滤
consumer.subscribe("topic_filter", "*");此处指定
tag:*,订阅topic_filter主题下的所有tag消息也可以:
tag_filter_1 || tag_filter_2进行指定tag的过滤
- sql语法过滤
MessageSelector.bySql()consumer.subscribe("topic_filter", MessageSelector.bySql("i>=5"));//通过sql过滤消息,只要i>=5的消息*注:如果出现以下异常,则需要在broker.conf中开启
enablePropertyFilter=trueException in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
/** * @ClassName FilterConsumer * @Desc 过滤消息 - 消费者 * @Author musha * @Date 2020/3/16 16:18 * @Version 1.0 */ public class FilterConsumer { public static void main(String[] args) throws Exception { //1.创建消费者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.设置nameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.订阅消息 //consumer.subscribe("topic_filter", "*");//通过tag过滤消息 consumer.subscribe("topic_filter", MessageSelector.bySql("i>=5"));//通过sql过滤消息,只要i>=5的消息//4.注册监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(messageExt -> { System.out.println("消费消息:" + new String(messageExt.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } }
6. 事务消息
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交(正常的提交流程)、事务消息的补偿流程(提交失败时的流程)。
事务消息发送及提交
发送消息(half消息)。
服务端响应消息写入结果。
根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)。
事务补偿
对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次”回查“(check back)
Producer收到回查消息,检查回查消息对应的本地事务的状态
根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。(这时候需要进行check back获取消息状态)
6.1 生产者
***注:**由于发送消息完需要进行监听回查,所以不需要关闭生产者
创建生产者时,使用事务消息生产者
TransactionMQProducer为生产者添加事务监听器
setTransactionListener(TransactionListener transactionListener)发送事务消息
sendMessageInTransaction(Message msg, Object arg)其中
msg为发送的消息,arg为附加参数,可用于事务监听器中执行本地事务时的获取参数另外,事务监听器中有两个实现方法:
executeLocalTransaction(Message message, Object arg)执行本地事务其中
message为发送的消息,arg为附加参数,是发送事务消息时添加的参数 checkLocalTransaction(MessageExt messageExt)事务状态回查这两个方法的返回值可参见枚举类
LocalTransactionState,其中参数有COMMIT_MESSAGE提交消息 ROLLBACK_MESSAGE回滚消息(抛弃该消息) UNKNOW未知状态,需要进行状态回查
/** * @ClassName TransactionProducer * @Desc 事务消息 - 生产者 * @Author musha * @Date 2020/3/16 19:39 * @Version 1.0 */ public class TransactionProducer { public static void main(String[] args) throws Exception { //1.**创建事务消息生产者,指定组名 TransactionMQProducer producer = new TransactionMQProducer("group_transaction"); //2.设置nameServer producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.**添加事务监听器,用来监听事务消息返回的状态 producer.setTransactionListener(new TransactionListener() { /** * @Description 在该方法中执行本地事务 * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id * @param o 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到 * @return: org.apache.rocketmq.client.producer.LocalTransactionState * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 * @Author musha * @Date 2020/3/16 19:55 */ @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { //这边可以创建一个全局共享变量concurrentHashMap,用来存储transactionId以及对应的值(比如正常执行1,异常执行-1等),便于回查时进行判断,这里就不赘述了 int i = (int) o; if (i % 3 == 0) { //提交消息 return LocalTransactionState.COMMIT_MESSAGE; } else if (i % 3 == 1) { //回滚消息 return LocalTransactionState.ROLLBACK_MESSAGE; } //消息回查 return LocalTransactionState.UNKNOW; } /** * @Description 事务消息状态回查 * @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态 * @return: org.apache.rocketmq.client.producer.LocalTransactionState * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 * @Author musha * @Date 2020/3/16 19:55 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("消息回查:" + messageExt.getTransactionId() + "\t" + new String(messageExt.getBody())); //可以根据消息体中的信息去数据库中查询该消息是否已经被执行 //或者根据上方执行本地事务时concurrentHashMap中存储的transactionId对应的值进行判断,返回对应的操作值 //这里演示就直接提交了 return LocalTransactionState.COMMIT_MESSAGE; } }); //4.启动生产者 producer.start(); for (int i = 0; i < 10; i++) { //5.创建消息 Message message = new Message("topic_trans", "tag_trans", ("musha太帅啦" + i).getBytes()); //6.**发送事务消息** // 参数一:消息对象 // 参数二:附加参数,可用于事务监听器中执行本地事务时的获取参数 SendResult sendResult = producer.sendMessageInTransaction(message, i); System.out.println("发送消息:" + sendResult); } //7.由于MQ要回查生产者,所以不需要关闭生产者 // producer.shutdown(); } }
6.2 消费者
/** * @ClassName TransactionConsumer * @Desc 事务消息 - 消费者 * @Author musha * @Date 2020/3/16 19:39 * @Version 1.0 */ public class TransactionConsumer { public static void main(String[] args) throws Exception { //1.创建消费者,指定组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_transaction"); //2.设置nameServer consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876"); //3.订阅消息 consumer.subscribe("topic_trans", "tag_trans"); //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { list.forEach(messageExt -> { System.out.println("消费消息:" + new String(messageExt.getBody())); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动"); } }
6.3 使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,默认单个消息的检查次数限制为 15 次,但是可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
transactionMsgTimeout
参数。 - 事务性消息可能不止一次被检查或消费。即使说,一条消息可能因为网络延迟等原因,会造成多次检查,就有可能会出现多次提交,消费端就会进行多次消费,所以消费端需要进行幂等性检查。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
- 点赞
- 收藏
- 分享
- 文章举报
- 使用jenkins、docker、consul、nginx搭建支持自动化构建部署以及弹性伸缩的集群系统详细教程
- 基于kubernetes构建Docker集群管理详解 不指定
- 使用Docker构建持续集成与自动部署的Docker集群
- RabbitMQ(四):使用Docker构建RabbitMQ高可用负载均衡集群
- 使用Docker构建持续集成与自动部署的Docker集群
- 使用Docker swarm构建wordpress集群 推荐
- 使用Docker构建redis集群
- Docker学习系列(三):Ubuntu下使用Docker的基本指令记录及一些注意事项
- 使用express.js框架一步步实现基本应用以及构建可扩展的web应用
- 使用docker selenium 构建web自动化分布式测试环境时,遇到的一些问题
- Docker笔记三:基于LVS DR模式构建WEB服务集群
- centos7下安装Docker,以及Docker的基本使用
- 一起学习Silverlight企业应用2:开始使用Silverlight,以及一些基本概念
- Java 中使用 MySql以及一些Mysql 基本的命令
- 基于kubernetes构建Docker集群管理详解
- 基于Kubernetes构建Docker集群管理详解
- 基于SP(SharedPreferences)的基本使用以及实际应用介绍
- 简单两步快速实现shiro的配置和使用,包含登录验证、角色验证、权限验证以及shiro登录注销流程(基于spring的方式,使用maven构建)
- 使用Docker Compose部署基于Sentinel的高可用Redis集群
- 使用Docker构建redis集群