您的位置:首页 > 运维架构 > Docker

基于Docker构建RocketMQ4.4.0集群,以及RocketMQ的一些基本使用

2020-03-30 19:13 2231 查看

文章目录

  • docker环境下rocketMQ集群
  • RocketMQ基本使用
  • 1.2 消费消息
  • 2. 顺序消息
  • 3. 延时消息
  • 4. 批量消息
  • 5. 过滤消息
  • 6. 事务消息
  • RocketMQ

    docket环境下安装单机RocketMQ

    1. #拉取rocketMQ镜像
      docker pull rocketmqinc/rocketmq:4.4.0
    2. 运行rocketMQ

        首先运行NameServer

        docker run -d --restart=always -p 9876:9876 -v /home/rocketMQ/data/namesrv/logs:/root/logs -v /home/rocketMQ/data/namesrv/store:/root/store --name RocketMQNSrv -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqnamesrv
      • 运行broker服务器

        #1.首先进入/home/rocketMQ/conf目录下创建broker.conf 文件
        cd /home/rocketMQ
        mkdir conf
        cd conf
        vi broker.conf
        #2.往conf文件中写入以下内容
        brokerClusterName = DefaultCluster #broker集群名
        brokerName = broker-a	#当前broker节点名
        brokerId = 0	#broker角色,0-master,>0-slave
        deleteWhen = 04	#每天消息清理时机:默认凌晨4点
        fileReservedTime = 48	#文件保留时长,默认72小时
        brokerRole = ASYNC_MASTER #Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE
        flushDiskType = ASYNC_FLUSH	#刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘
        brokerIP1 = {本地外网 IP}	#docker中需要指定宿主机的IP
        
        #3.启动broker服务器
        docker run -d --restart=always -p 10911:10911 -p 10909:10909 -v  /home/rocketMQ/data/broker/logs:/root/logs -v  /home/rocketMQ/data/broker/store:/root/store -v  /home/rocketMQ/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name RocketMQBroker --link RocketMQNSrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
    3. 安装 rocketmq 控制台

      #1.拉取镜像
      docker pull pangliang/rocketmq-console-ng
      
      #2.运行
      docker run -d --restart=always -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.56.101:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 -t pangliang/rocketmq-console-ng

      Drocketmq.namesrv.addr=192.168.56.101:9876

      如果有多个nameServer则分号分隔,如192.168.56.101:9876;192.168.56.3:9876

    broker.conf配置详解

    更多配置信息可以参考:

    RocketMQ4.3.x 史上配置最全详解,没有之一

    rocketmq配置

    #所属集群名字
    borkerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示Master, >0 表示Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许Broker自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许Broker自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10915
    #删除文件时间点,默认凌晨4点
    deleteWhen=04
    #文件保留时间,默认48小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/middle/rocketmq/data
    #commitLog存储路径
    storePathCommitLog=/usr/local/middle/rocketmq/data/commitlog
    #消费队列存储路径
    storePathConsumeQueue=/usr/local/middle/rocketmq/data/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/middle/rocketmq/data/index
    #checkpoint 文件存储路径
    storeCheckPoint=/usr/local/middle/rocketmq/data/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/middle/rocketmq/data/abort
    #限制的消息大小
    maxMessageSize=65536
    
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    
    #刷盘方式
    #- ASYNC_FLUSH  异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #开启属性过滤
    enablePropertyFilter=true
    
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    
    #docker中需要指定宿主机的IP
    brokerIP1={本地外网 IP}

    docker环境下rocketMQ集群

    此处使用2个nameServer,2个master2个slave

    服务器1:master1、slave2

    服务器2:master2、slave1

    集群主要是broker集群,每个broker映射一个

    broker.conf
    文件

    rocke有9876
    非vip通道端口:10911
    vip通道端口:10909
    10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口

    #关闭VIP通道
    rocketmq.config.isVIPChannel=false

    如果是broker集群的话,还要开放10912,否则master的消息将无法复制到slave节点,会报SLAVE_NOT_AVAILABLE异常

    安装nameServer

    同上面单机装配一样

    服务器1的配置文件

    broker-master1.conf

    # nameServer地址,如果nameserver是多台集群的话,就用分号分割
    namesrvAddr=192.168.56.101:9876;192.168.56.3:9876
    # 所属集群名字
    brokerClusterName=rocketmq-cluster
    # broker名字,注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
    brokerName=broker-a
    # 0 表示 Master,>0 表示 Slave
    brokerId=0
    # Broker 对外服务的监听端口
    listenPort=10911
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4
    # defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错
    diskMaxUsedSpaceRatio=90
    # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE
    brokerRole=SYNC_MASTER
    # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #开启属性过滤
    enablePropertyFilter=true
    #docker中需要指定宿主机的IP
    brokerIP1=192.168.56.101
    brokerIP2=192.168.56.101
    #运行master1
    docker run -d --restart=always -p 10911:10911 -p 10909:10909 -p 10912:10912 -v  /home/rocketMQ/data/broker/logs:/root/logs -v  /home/rocketMQ/data/broker/store:/root/store -v  /home/rocketMQ/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name RocketMQBroker -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

    broker-slave2.conf

    # nameServer地址,如果nameserver是多台集群的话,就用分号分割
    namesrvAddr=192.168.56.101:9876;192.168.56.3:9876
    # 所属集群名字
    brokerClusterName=rocketmq-cluster
    # broker名字(同一主从下:Master和slave名称要一致),注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
    #*****注意,主从一致
    brokerName=broker-b
    # 0 表示 Master,>0 表示 Slave
    brokerId=1
    # Broker 对外服务的监听端口
    listenPort=20911
    # Master监听端口,从服务器连接该端口,默认为10912
    haListenPort=20912
    # VIP通道端口
    fastListenPort=20909
    # Master服务器IP地址与端口号(指定HA连接的master地址)
    haMasterAddress=192.168.56.3:10912
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4
    # defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错
    diskMaxUsedSpaceRatio=90
    # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE
    brokerRole=SLAVE
    # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #开启属性过滤
    enablePropertyFilter=true
    #docker中需要指定宿主机的IP
    brokerIP1=192.168.56.101

    *:docker中必须指定

    haMasterAddress
    或者在master中指定
    brokerIP2
    ,否则发送消息可能会报
    SLAVE_NOT_AVAILABLE
    (不知道大家是不是也会跟我一样有这种情况,可能也只是个例),这边因为是双主机双主双从,所以修改端口号防止冲突

    Master监听端口,从服务器连接该端口,默认为10912

    haListenPort=20912

    VIP通道端口,默认10909

    fastListenPort=20909

    Master服务器IP地址与端口号(指定HA连接的master地址)

    haMasterAddress=192.168.56.101:10912

    #运行slave2
    docker run -d --restart=always -p 20911:20911 -p 20909:20909 -p 20912:20912 -v  /home/rocketMQ/data/broker-slave2/logs:/root/logs -v  /home/rocketMQ/data/broker-slave2/store:/root/store -v  /home/rocketMQ/conf/broker-slave2.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name Broker-slave2 -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

    服务器2的配置文件

    broker-master2.conf

    # nameServer地址,如果nameserver是多台集群的话,就用分号分割
    namesrvAddr=192.168.56.101:9876;192.168.56.3:9876
    # 所属集群名字
    brokerClusterName=rocketmq-cluster
    # broker名字,注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
    brokerName=broker-b
    # 0 表示 Master,>0 表示 Slave
    brokerId=0
    # Broker 对外服务的监听端口
    listenPort=10911
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4
    # defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错
    diskMaxUsedSpaceRatio=90
    # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE
    brokerRole=SYNC_MASTER
    # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #开启属性过滤
    enablePropertyFilter=true
    #docker中需要指定宿主机的IP
    brokerIP1=192.168.56.3
    brokerIP2=192.168.56.3
    #运行master2
    docker run -d --restart=always -p 10911:10911 -p 10909:10909 -p 10912:10912 -v  /home/rocketMQ/data/broker/logs:/root/logs -v  /home/rocketMQ/data/broker/store:/root/store -v  /home/rocketMQ/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name RocketMQBroker -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

    broker-slave1.conf

    # nameServer地址,如果nameserver是多台集群的话,就用分号分割
    namesrvAddr=192.168.56.101:9876;192.168.56.3:9876
    # 所属集群名字
    brokerClusterName=rocketmq-cluster
    # broker名字(同一主从下:Master和slave名称要一致),注意此处不同的配置文件填写的不一样  例如:在a.properties 文件中写 broker-a  在b.properties 文件中写 broker-b
    #*****注意,主从一致
    brokerName=broker-a
    # 0 表示 Master,>0 表示 Slave
    brokerId=1
    # Broker 对外服务的监听端口
    listenPort=20911
    # Master监听端口,从服务器连接该端口,默认为10912
    haListenPort=20912
    # VIP通道端口
    fastListenPort=20909
    # Master服务器IP地址与端口号(指定HA连接的master地址)
    haMasterAddress=192.168.56.101:10912
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。由于是4个broker节点,所以设置为4
    # defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    # 检测可用的磁盘空间大小,当磁盘被占用超过90%,消息写入会直接报错
    diskMaxUsedSpaceRatio=90
    # Broker 的角色: ASYNC_MASTER 异步复制Master ; SYNC_MASTER 同步双写Master; SLAVE
    brokerRole=SLAVE
    # 刷盘方式 ASYNC_FLUSH 异步刷盘; SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #开启属性过滤
    enablePropertyFilter=true
    #docker中需要指定宿主机的IP
    brokerIP1=192.168.56.3

    *:同broker-slave2注释

    Master监听端口,从服务器连接该端口,默认为10912

    haListenPort=20912

    VIP通道端口,默认10909

    fastListenPort=20909

    Master服务器IP地址与端口号(指定HA连接的master地址)

    haMasterAddress=192.168.56.101:10912

    #运行slave1
    docker run -d --restart=always -p 20911:20911 -p 20909:20909 -p 20912:20912 -v  /home/rocketMQ/data/broker-slave1/logs:/root/logs -v  /home/rocketMQ/data/broker-slave1/store:/root/store -v  /home/rocketMQ/conf/broker-slave1.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name Broker-slave1 -e "JAVA_OPT_EXT=-server -Xms256m -Xmx256m" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

    RocketMQ基本使用

    消息发送步骤

    • 创建生产者producer,并指定生产者组名
    • 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
    • 启动producer
    • 创建消息对象,指定主题topic,tag和消息体
    • 发送消息
    • 关闭producer

    消息消费步骤

    • 创建消费者consumer,指定消费组名
    • 指定NameServer
    • 订阅主题topic和tag
    • 注册消息监听器,设置回调函数,处理消息
    • 启动消费者consumer

    首先导入maven依赖(请自行选择对应的版本)

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
    </dependency>

    消息发送结果

    • SEND_OK:消息发送成功
    • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
    • FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
    • SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失

    1. 基础消息

    1.1 发送消息

    1.1.1 同步消息

    需要接到消息结果之后再发送下一个消息

    这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

    producer.send(message);

    public static void main(String[] args) throws Exception {
    //1.- 创建生产者producer,并指定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer("group-1");
    //2.- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
    producer.setNamesrvAddr("192.168.56.101:9876");
    //3.- 启动producer
    producer.start();
    //4.- 创建消息对象,指定主题topic,tag和消息体
    for (int i = 0; i < 10; i++) {
    /**
    * 参数一:消息主题 topic
    * 参数二:消息tag
    * 参数三:消息内容
    */
    Message message = new Message("base", "tag_sync", ("hello musha" + i).getBytes());
    //5.- 发送消息
    SendResult result = producer.send(message);
    //发送状态
    SendStatus sendStatus = result.getSendStatus();
    //消息ID
    String msgId = result.getMsgId();
    //消息队列ID
    int queueId = result.getMessageQueue().getQueueId();
    //            System.out.println("发送状态:" + sendStatus);
    //            System.out.println("消息ID:" + msgId);
    //            System.out.println("消息队列ID:" + queueId);
    System.out.println("发送结果:" + result);
    //线程sleep 1s
    TimeUnit.SECONDS.sleep(1);
    }
    //6.- 关闭producer
    producer.shutdown();
    }
    1.1.2 异步消息

    不必等待返回结果,立即发送下一个消息,可以通过

    send(Message msg, SendCallback sendCallback)
    中的回调函数,对返回结果进行处理。

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    producer.send(message, new SendCallback())

    public static void main(String[] args) throws Exception {
    //1.- 创建生产者producer,并指定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer("group-1");
    //2.- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
    producer.setNamesrvAddr("192.168.56.101:9876");
    //3.- 启动producer
    producer.start();
    //4.- 创建消息对象,指定主题topic,tag和消息体
    for (int i = 0; i < 10; i++) {
    /**
    * 参数一:消息主题 topic
    * 参数二:消息tag
    * 参数三:消息内容
    */
    Message message = new Message("base", "tag_async", ("hello musha" + i).getBytes());
    //5.- 发送消息 *主要是这里不一样
    producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    //发送状态
    //消息ID
    String msgId = sendResult.getMsgId();
    //消息队列ID
    //                    int queueId = sendResult.getMessageQueue().getQueueId();
    System.out.println("发送结果:" + sendResult);
    }
    
    @Override
    public void onException(Throwable throwable) {
    
    }
    });
    
    //线程sleep 1s
    TimeUnit.SECONDS.sleep(1);
    }
    //6.- 关闭producer
    producer.shutdown();
    }
    1.1.3 单向消息

    通俗来说,就是发送消息不必等待返回结果,也无需执行回调函数。

    这种方式主要用在不特别关心发送结果的场景,例如日志发送。

    producer.sendOneway(message);

    public static void main(String[] args) throws Exception {
    //1.- 创建生产者producer,并指定生产者组名
    DefaultMQProducer producer = new DefaultMQProducer("group-1");
    //2.- 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
    producer.setNamesrvAddr("192.168.56.101:9876");
    //3.- 启动producer
    producer.start();
    //4.- 创建消息对象,指定主题topic,tag和消息体
    for (int i = 0; i < 10; i++) {
    /**
    * 参数一:消息主题 topic
    * 参数二:消息tag
    * 参数三:消息内容
    */
    Message message = new Message("base", "tag_oneway", ("hello musha" + i).getBytes());
    //5.- 发送消息 *主要是这里不一样
    producer.sendOneway(message);
    //线程sleep 1s
    TimeUnit.SECONDS.sleep(1);
    }
    //6.- 关闭producer
    producer.shutdown();
    }

    1.2 消费消息

    1.2.1 负载均衡

    默认即为负载均衡

    //若不进行该属性设置,默认也是负载均衡策略
    consumer.setMessageModel(MessageModel.CLUSTERING);
    public static void main(String[] args) throws Exception {
    //1- 创建消费者consumer,指定消费组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2- 指定NameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3- 订阅主题topic和tag
    //  参数一:主题topic
    //  参数二:tag
    consumer.subscribe("base", "tag_sync || tag_async");
    
    //**设置消费模式-负载均衡
    consumer.setMessageModel(MessageModel.CLUSTERING);
    
    //4- 设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    //接受消息内容
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(msg -> {
    System.out.println(new String(msg.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5- 启动消费者consumer
    consumer.start();
    }
    1.2.2 广播消费

    一个消息,所有的消费者都能进行消费

    consumer.setMessageModel(MessageModel.BROADCASTING);
    public static void main(String[] args) throws Exception {
    //1- 创建消费者consumer,指定消费组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2- 指定NameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3- 订阅主题topic和tag
    //  参数一:主题topic
    //  参数二:tag
    consumer.subscribe("base", "tag_sync || tag_async");
    
    //**设置消费模式 - 广播模式
    consumer.setMessageModel(MessageModel.BROADCASTING);
    //4- 设置回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    //接受消息内容
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(msg -> {
    System.out.println(new String(msg.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5- 启动消费者consumer
    consumer.start();
    }
    

    2. 顺序消息

    由于目前部署了双主双从模式

    发送消息时,会创建一个topic在2个broker上,每个broker各自创建4个队列。

    消息并不会发送至同一个队列中,会导致无法保证顺序消费(如果需要顺序消费的话)。

    保证消息的顺序性

    • 全局顺序消息

      一般通过建立单队列的方式,消息全在一个队列里,保证消息的顺序性(FIFO)

    • 局部顺序消息

      不需要限定单个队列,将需要按序消费的消息,放入到同一个队列中即可。

      比如:顺序消息A1,B2,C3,D4,都放入到队列1中,就能保证消息的顺序性。

    2.1 顺序消费的实体类

    /**
    * @ClassName OrderStep
    * @Desc 顺序消息-数据类
    * @Author musha
    * @Date 2020/3/15 13:49
    * @Version 1.0
    */
    @Data
    @AllArgsConstructor
    public class OrderStep {
    private String orderId;
    private String desc;
    
    /**
    * @param
    * @Description 用来创建订单集合
    * @return: java.util.List<com.musha.mq.rocket.demo.order.OrderStep>
    * @Author musha
    * @Date 2020/3/15 13:51
    */
    public static List<OrderStep> getOrders() {
    List<OrderStep> orderList = new ArrayList<>();
    //创建订单ID,此处模拟3个订单
    // 订单A: 创建->付款->物流->完成
    // 订单B: 创建->付款
    // 订单C: 创建->付款->物流
    String orderIdA = UUID.randomUUID().toString().substring(0, 10);
    String orderIdB = UUID.randomUUID().toString().substring(0, 10);
    String orderIdC = UUID.randomUUID().toString().substring(0, 10);
    
    orderList.add(new OrderStep(orderIdA, "A创建订单"));
    orderList.add(new OrderStep(orderIdB, "B创建订单"));
    orderList.add(new OrderStep(orderIdA, "A付款"));
    orderList.add(new OrderStep(orderIdC, "C创建订单"));
    orderList.add(new OrderStep(orderIdC, "C付款"));
    orderList.add(new OrderStep(orderIdA, "A物流"));
    orderList.add(new OrderStep(orderIdB, "B付款"));
    orderList.add(new OrderStep(orderIdA, "A完成"));
    orderList.add(new OrderStep(orderIdC, "C物流"));
    
    return orderList;
    }
    }

    2.2 生产者

    需要将同一个业务标识(同一个订单号)订单的消息,放置到同一个队列中,保证其有序性

    send(Message msg, MessageQueueSelector selector, Object arg)

    msg:消息对象

    selector:消息队列的选择器,主要在这里进行消息队列的选择

    arg:业务标识(这里为订单号)

    select(List<MessageQueue> list, Message message, Object o)

    @param list 消息队列
    @param message 消息对象(即上面传递过来的message
    @param o 业务标识的参数(即传递过来的orderId)

    /**
    * @ClassName OrderProducer
    * @Desc 顺序消费 - 生产者
    * @Author musha
    * @Date 2020/3/15 14:01
    * @Version 1.0
    */
    public class OrderProducer {
    public static void main(String[] args) throws Exception {
    //1.创建消息生产者,指定组名
    DefaultMQProducer producer = new DefaultMQProducer("orderMQ");
    //2.指定NameServer
    producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.启动producer
    producer.start();
    
    //4.获取订单集合信息
    List<OrderStep> orderList = OrderStep.getOrders();
    
    //发送消息
    int i = 0;
    for (OrderStep orderStep : orderList) {
    //5.创建消息
    Message message = new Message("OrderTopic", "OrderTag", "i:" + i++, (orderStep + "").getBytes());
    
    //6.发送消息
    // 参数一:消息对象
    // 参数二:消息队列的选择器
    // 参数三:选择队列的业务标识(此处为订单ID)
    // 其实就是:使用业务标识,进行一定的规则,选出该标识对应存储的队列
    SendResult sendResult = producer.send(message, new MessageQueueSelector() {
    /**
    * @param list    消息队列
    * @param message 消息对象(即上面传递过来的message)
    * @param o       业务标识的参数(即传递过来的orderId)
    * @return: org.apache.rocketmq.common.message.MessageQueue
    */
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    String orderId = (String) o;
    //计算当前id存储的队列索引(取绝对值,因为hashCode可能为负数),这里ID只是做测试用,正式情况请
    int index = Math.abs(orderId.hashCode()) % (list.size() - 1);
    return list.get(index);
    }
    }, orderStep.getOrderId());
    System.out.println("发送结果:" + sendResult);
    }
    producer.shutdown();
    }
    
    }

    2.3 消费者

    consumer注册消息监听器

    MessageListenerOrderly

    MessageListenerOrderly
    :使用该监听器时,只启用一个线程对一个队列进行消费

    即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费)

    *同一个topic下的不同消息队列可以被并发消费

    /**
    * @ClassName OrderConsumer
    * @Desc 顺序消息 - 消费者
    * @Author musha
    * @Date 2020/3/15 14:22
    * @Version 1.0
    */
    public class OrderConsumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderMQ");
    //2.指定nameServer地址
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.订阅主题topic
    consumer.subscribe("OrderTopic", "OrderTag");
    
    //4.注册消息监听器
    // 顺序消息监听 - MessageListenerOrderly 使用该监听器时,只启用一个线程对一个队列进行消费
    // 即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费)
    consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
    list.forEach(msg -> {
    System.out.println(Thread.currentThread().getName() + "\t" + LocalDateTime.now() + "\t\t消费消息:" + new String(msg.getBody()));
    });
    return ConsumeOrderlyStatus.SUCCESS;
    }
    });
    
    //5.启动消费者
    consumer.start();
    System.out.println("消费者启动");
    }
    }

    3. 延时消息

    在message中设置延时时间

    message.setDelayTimeLevel(3);

    延时等级分为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    从等级1开始

    3.1 生产者

    /**
    * @ClassName DelayProducer
    * @Desc 延时消息 - 生产者
    * @Author musha
    * @Date 2020/3/15 15:42
    * @Version 1.0
    */
    public class DelayProducer {
    public static void main(String[] args) throws Exception {
    //1.创建生产者,指定组
    DefaultMQProducer producer = new DefaultMQProducer("gourp_delay");
    //2.设置nameServer
    producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.启动producer
    producer.start();
    
    for (int i = 0; i < 10; i++) {
    //4.创建消息
    Message message = new Message("topic_delay", "tag_delay", "key_delay" + i, "musha真帅".getBytes());
    //5.==*延时消息*== 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    //  3对应的10s
    message.setDelayTimeLevel(3);
    //6.发送消息
    SendResult sendResult = producer.send(message);
    System.out.println("发送结果:" + sendResult);
    }
    producer.shutdown();
    }
    }

    3.2 消费者

    /**
    * @ClassName DelayConsumer
    * @Desc 延时消息 - 消费者
    * @Author musha
    * @Date 2020/3/15 15:42
    * @Version 1.0
    */
    public class DelayConsumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者,指定组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_delay");
    //2.设置nameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.订阅主题
    consumer.subscribe("topic_delay", "tag_delay");
    //4.注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(messageExt -> {
    //getStoreTimestamp()获取消息入队的时间
    System.out.println("消息ID:" + messageExt.getMsgId() + "\t延时时间:" + (System.currentTimeMillis() - messageExt.getStoreTimestamp()) + "\t消费消息:" + new String(messageExt.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5.启动消费者
    consumer.start();
    System.out.println("消费者启动");
    }
    }

    4. 批量消息

    前面发送的消息都是一条一条发送,批量消息则是一次发送多条消息,这一批消息总大小(无论是单条消息还是消息总大小)不应该超过4MB(默认大小)

    批量消息一般有两种使用情况:

    1. 达到一定消息数量之后发送
    2. 一定时间发送(比如3s发送一次)

    4.1 生产者

    主要就是创建一个消息集合,然后发送消息集合

    List<Message> messageList = new ArrayList<>();

    SendResult sendResult = producer.send(messageList);

    /**
    * @ClassName BatchProducer
    * @Desc 批量消息 - 生产者
    * @Author musha
    * @Date 2020/3/16 10:57
    * @Version 1.0
    */
    public class BatchProducer {
    public static void main(String[] args) throws Exception {
    //1.创建生产者,指定组
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.绑定nameServer
    producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.启动生产者
    producer.start();
    //4.创建消息集合
    List<Message> messageList = new ArrayList<>();
    messageList.add(new Message("topic_batch", "tag_batch", "musha真帅1".getBytes()));
    messageList.add(new Message("topic_batch", "tag_batch", "musha真帅2".getBytes()));
    messageList.add(new Message("topic_batch", "tag_batch", "musha真帅3".getBytes()));
    //5.发送消息
    SendResult sendResult = producer.send(messageList);
    System.out.println("发送结果:" + sendResult);
    //6.关闭消费者
    producer.shutdown();
    }
    }

    4.2 消费者

    与正常消费一致

    /**
    * @ClassName BatchConsumer
    * @Desc 批量消息 - 消费者
    * @Author musha
    * @Date 2020/3/16 11:17
    * @Version 1.0
    */
    public class BatchConsumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2.设置nameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.订阅消息
    consumer.subscribe("topic_batch", "tag_batch");
    //4.注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(messageExt -> {
    System.out.println("消费消息:" + new String(messageExt.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5.启动消费者
    consumer.start();
    System.out.println("消费者启动");
    }
    }

    4.3 消息分割

    如果发送的消息集合超过了4MB,则需要对消息进行分割,rocketmq建议每次批量消息大小大概在1MB。

    如果超过4MB,会出现如下异常:

    Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 13  DESC: the message body size over max value, MAX: 4194304

    4.3.1 生产者

    主要就是创建一个消息分割器类

    MessageSplitter
    ,该类需要实现迭代器接口
    Iterator

    然后将原本的消息集合进行迭代生成不超出规定大小的子集合,分批次进行批量消息的发送

    /**
    * @ClassName BatchProducer
    * @Desc 批量消息 - 生产者
    * @Author musha
    * @Date 2020/3/16 10:57
    * @Version 1.0
    */
    public class BatchProducer {
    public static void main(String[] args) throws Exception {
    //1.创建生产者,指定组
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.绑定nameServer
    producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.启动生产者
    producer.start();
    //4.创建消息集合
    List<Message> messageList = new ArrayList<>();
    //每个byte数组为0.5MB
    byte[] bytes = new byte[(int) (1024 * 1024 * 0.2)];
    for (int i = 0; i < 20; i++) {
    byte[] resourceBytes = ("musha太帅了").getBytes();
    //arraycopy实现数组之间的复制
    /**
    * @param      src      the source array. 源数组
    * @param      srcPos   starting position in the source array. 源数组的起始位置
    * @param      dest     the destination array. 目标数组
    * @param      destPos  starting position in the destination data. 目标数组的起始位置
    * @param      length   the number of array elements to be copied. 复制的长度
    */
    System.arraycopy(resourceBytes, 0, bytes, 0, resourceBytes.length);
    messageList.add(new Message("topic_batch", "tag_batch_" + i, bytes));
    }
    MessageSplitter messageSplitter = new MessageSplitter(messageList);
    int count = 0;
    while (messageSplitter.hasNext()) {
    List<Message> list = messageSplitter.next();
    //5.发送消息
    SendResult sendResult = producer.send(list);
    System.out.println(++count + "次发送结果:" + sendResult);
    }
    //6.关闭消费者
    producer.shutdown();
    }
    }
    
    /**
    * @Description 消息分割器
    * @Author musha
    * @Date 2020/3/16 21:43
    */
    class MessageSplitter implements Iterator<List<Message>> {
    
    //消息大小控制在1MB
    private final int SIZE_LIMIT = 1024 * 1024 * 1;
    //消息集合
    private final List<Message> messageList;
    //当前索引(游标)
    private int currentIndex;
    
    public MessageSplitter(List<Message> messageList) {
    this.messageList = messageList;
    }
    
    /**
    * @Description 当前迭代器中是否还有元素
    * @return: boolean
    * @Author musha
    * @Date 2020/3/16 22:00
    */
    @Override
    public boolean hasNext() {
    return this.currentIndex < this.messageList.size();
    }
    
    /**
    * @Description 对消息集合进行拆分,并返回子集合
    * @return: java.util.List<org.apache.rocketmq.common.message.Message>
    * @Author musha
    * @Date 2020/3/16 22:03
    */
    @Override
    public List<Message> next() {
    int nextIndex = this.currentIndex;
    int totalSize = 0;
    //遍历消息进行拆分
    for (; nextIndex < this.messageList.size(); nextIndex++) {
    //获取消息
    Message message = messageList.get(nextIndex);
    //获取消息大小
    int tmpSize = message.getTopic().length() + message.getBody().length;
    //遍历msg中的属性,并将属性和属性值大小计算到消息大小中
    for (Map.Entry<String, String> entry : message.getProperties().entrySet()) {
    tmpSize += entry.getKey().length() + entry.getValue().length();
    }
    //增加日志的开销20字节
    tmpSize += 20;
    //如果当前消息大小超过了限制大小
    if (tmpSize > SIZE_LIMIT) {
    //nextIndex - currentIndex == 0代表本次遍历刚开始,子集合(subList)中没有值,则将当前消息添加到子集合中,然后结束循环
    //如果不等于0,则表示当前子集合不为空,则将该消息留到下一个子集合中放置
    if (nextIndex - currentIndex == 0) {
    nextIndex++;
    }
    //结束循环
    break;
    }
    //如果当前已经计算的消息集合大小+本次遍历的消息超出了限制大小,则直接结束循环
    if (tmpSize + totalSize > SIZE_LIMIT) {
    break;
    } else {
    totalSize += tmpSize;
    }
    }
    //截取出未超出限制大小的子集合
    List<Message> subList = this.messageList.subList(currentIndex, nextIndex);
    //更新游标
    currentIndex = nextIndex;
    return subList;
    }
    }
    4.3.2 消费者

    依旧没什么区别

    /**
    * @ClassName BatchConsumer
    * @Desc 批量消息 - 消费者
    * @Author musha
    * @Date 2020/3/16 11:17
    * @Version 1.0
    */
    public class BatchConsumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2.设置nameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.订阅消息
    consumer.subscribe("topic_batch", "*");
    //4.注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(messageExt -> {
    System.out.println("消费消息:" + messageExt.getTags() + "\t" + new String(messageExt.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5.启动消费者
    consumer.start();
    System.out.println("消费者启动");
    }
    }

    5. 过滤消息

    消费时进行消息筛选

    5.1 生产者

    • 使用发送的消息tag进行区分,用于消费者tag过滤

    • message可以使用putUserProperty进行自定义属性绑定,供消费者筛选

    message.putUserProperty("i", i + "");

    /**
    * @ClassName FilterProducer
    * @Desc 过滤消息 - 生产者
    * @Author musha
    * @Date 2020/3/16 16:18
    * @Version 1.0
    */
    public class FilterProducer {
    public static void main(String[] args) throws Exception {
    //1.创建生产者,指定组名
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    //2.设置nameServer
    producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.启动生产者
    producer.start();
    
    for (int i = 0; i < 10; i++) {
    //4.创建消息
    Message message = new Message("topic_filter", "tag_filter_" + i, ("musha真帅" + i).getBytes());
    //**绑定自定义属性**
    message.putUserProperty("i", i + "");
    
    //5.发送消息
    SendResult sendResult = producer.send(message);
    System.out.println("发送结果:" + sendResult);
    }
    //6.关闭生产者
    producer.shutdown();
    }
    }

    5.2 消费者

    • tag过滤

    consumer.subscribe("topic_filter", "*");

    此处指定

    tag
    :*,订阅topic_filter主题下的所有tag消息

    也可以:

    tag_filter_1 || tag_filter_2
    进行指定tag的过滤

    • sql语法过滤

    MessageSelector.bySql()

    consumer.subscribe("topic_filter", MessageSelector.bySql("i>=5"));//通过sql过滤消息,只要i>=5的消息

    *注:如果出现以下异常,则需要在broker.conf中开启

    enablePropertyFilter=true

    Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92
    /**
    * @ClassName FilterConsumer
    * @Desc 过滤消息 - 消费者
    * @Author musha
    * @Date 2020/3/16 16:18
    * @Version 1.0
    */
    public class FilterConsumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    //2.设置nameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.订阅消息
    //consumer.subscribe("topic_filter", "*");//通过tag过滤消息
    consumer.subscribe("topic_filter", MessageSelector.bySql("i>=5"));//通过sql过滤消息,只要i>=5的消息//4.注册监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(messageExt -> {
    System.out.println("消费消息:" + new String(messageExt.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5.启动消费者
    consumer.start();
    System.out.println("消费者启动");
    }
    }
    

    6. 事务消息

    上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交(正常的提交流程)、事务消息的补偿流程(提交失败时的流程)。

    • 事务消息发送及提交

        发送消息(half消息)。

      1. 服务端响应消息写入结果。

      2. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

      3. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)。

    • 事务补偿

        对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次”回查“(check back)

      1. Producer收到回查消息,检查回查消息对应的本地事务的状态

      2. 根据本地事务状态,重新Commit或者Rollback

      其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

    • 事务消息状态

      事务消息共有三种状态,提交状态、回滚状态、中间状态

      TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

    • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

    • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。(这时候需要进行check back获取消息状态)

    6.1 生产者

    ***注:**由于发送消息完需要进行监听回查,所以不需要关闭生产者

    • 创建生产者时,使用事务消息生产者

      TransactionMQProducer

    • 为生产者添加事务监听器

      setTransactionListener(TransactionListener transactionListener)

    • 发送事务消息

      sendMessageInTransaction(Message msg, Object arg)

      其中

      msg
      为发送的消息,
      arg
      为附加参数,可用于事务监听器中执行本地事务时的获取参数

    另外,事务监听器中有两个实现方法:

    • executeLocalTransaction(Message message, Object arg)
      执行本地事务

      其中

      message
      为发送的消息,
      arg
      为附加参数,是发送事务消息时添加的参数

    • checkLocalTransaction(MessageExt messageExt)
      事务状态回查

    • 这两个方法的返回值可参见枚举类

      LocalTransactionState
      ,其中参数有

      COMMIT_MESSAGE
      提交消息
    • ROLLBACK_MESSAGE
      回滚消息(抛弃该消息)
    • UNKNOW
      未知状态,需要进行状态回查
    /**
    * @ClassName TransactionProducer
    * @Desc 事务消息 - 生产者
    * @Author musha
    * @Date 2020/3/16 19:39
    * @Version 1.0
    */
    public class TransactionProducer {
    public static void main(String[] args) throws Exception {
    //1.**创建事务消息生产者,指定组名
    TransactionMQProducer producer = new TransactionMQProducer("group_transaction");
    //2.设置nameServer
    producer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.**添加事务监听器,用来监听事务消息返回的状态
    producer.setTransactionListener(new TransactionListener() {
    /**
    * @Description 在该方法中执行本地事务
    * @param message   回传的消息,利用transactionId即可获取到该消息的唯一Id
    * @param o         调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
    * @return: org.apache.rocketmq.client.producer.LocalTransactionState
    *          返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
    * @Author musha
    * @Date 2020/3/16 19:55
    */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //这边可以创建一个全局共享变量concurrentHashMap,用来存储transactionId以及对应的值(比如正常执行1,异常执行-1等),便于回查时进行判断,这里就不赘述了
    int i = (int) o;
    if (i % 3 == 0) {
    //提交消息
    return LocalTransactionState.COMMIT_MESSAGE;
    } else if (i % 3 == 1) {
    //回滚消息
    return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    //消息回查
    return LocalTransactionState.UNKNOW;
    }
    
    /**
    * @Description 事务消息状态回查
    * @param messageExt    通过获取transactionId来判断这条消息的本地事务执行状态
    * @return: org.apache.rocketmq.client.producer.LocalTransactionState
    *          返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
    * @Author musha
    * @Date 2020/3/16 19:55
    */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    System.out.println("消息回查:" + messageExt.getTransactionId() + "\t" + new String(messageExt.getBody()));
    //可以根据消息体中的信息去数据库中查询该消息是否已经被执行
    //或者根据上方执行本地事务时concurrentHashMap中存储的transactionId对应的值进行判断,返回对应的操作值
    
    //这里演示就直接提交了
    return LocalTransactionState.COMMIT_MESSAGE;
    }
    });
    //4.启动生产者
    producer.start();
    
    for (int i = 0; i < 10; i++) {
    //5.创建消息
    Message message = new Message("topic_trans", "tag_trans", ("musha太帅啦" + i).getBytes());
    //6.**发送事务消息**
    //  参数一:消息对象
    //  参数二:附加参数,可用于事务监听器中执行本地事务时的获取参数
    SendResult sendResult = producer.sendMessageInTransaction(message, i);
    System.out.println("发送消息:" + sendResult);
    }
    //7.由于MQ要回查生产者,所以不需要关闭生产者
    //        producer.shutdown();
    }
    }

    6.2 消费者

    /**
    * @ClassName TransactionConsumer
    * @Desc 事务消息 - 消费者
    * @Author musha
    * @Date 2020/3/16 19:39
    * @Version 1.0
    */
    public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
    //1.创建消费者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_transaction");
    //2.设置nameServer
    consumer.setNamesrvAddr("192.168.56.101:9876;192.168.56.3:9876");
    //3.订阅消息
    consumer.subscribe("topic_trans", "tag_trans");
    //4.注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    list.forEach(messageExt -> {
    System.out.println("消费消息:" + new String(messageExt.getBody()));
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    //5.启动消费者
    consumer.start();
    System.out.println("消费者启动");
    }
    }

    6.3 使用限制

    1. 事务消息不支持延时消息和批量消息。
    2. 为了避免单个消息被检查太多次而导致半队列消息累积,默认单个消息的检查次数限制为 15 次,但是可以通过 Broker 配置文件的
      transactionCheckMax
      参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =
      transactionCheckMax
      ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。可以通过重写
      AbstractTransactionCheckListener
      类来修改这个行为。
    3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于
      transactionMsgTimeout
      参数。
    4. 事务性消息可能不止一次被检查或消费。即使说,一条消息可能因为网络延迟等原因,会造成多次检查,就有可能会出现多次提交,消费端就会进行多次消费,所以消费端需要进行幂等性检查
    5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
    6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
    • 点赞
    • 收藏
    • 分享
    • 文章举报
    目沙 发布了3 篇原创文章 · 获赞 0 · 访问量 157 私信 关注
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: