您的位置:首页 > 编程语言 > Java开发

最新SpringBoot使用RabbitMQ延时队列小白必备

2020-01-12 20:15 316 查看

idea最新简单SpringBoot使用RabbitMQ延时队列小白必备

了解RabbitMQ

1.什么是MQ

  • MQ,是一种跨进程的通信机制,用于上下游传递消息。

    在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

    使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

为什么会产生消息列队?

  • 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
  • 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

延时列队的使用场景?

  1. 订单业务:在淘宝或者京东购买东西,用户下单后未付款则30分钟后取消订单。
  2. 短信通知:手机用户交完话费后,几分钟之内将会收到缴费信息

2.什么是RabbitMQ(这里就做了一下简单介绍)

  • RabbitMQ是一种消息队列 ,用于常见的进程通信。支持点对点,请求应答和发布订阅模式 并且提供多种语言的支持。常见的java,c#,php都支持。

    常被用在异步处理,应用解耦。流量消锋等复杂的业务场景中。和java的kafka一样都属于消息中间件。

    下载地址:

  • https://www.rabbitmq.com/download.html

进入RabbitMQ官网

1.第一步

  1. 第二步

  2. 下载好后不要着急安装RabbitMQ,我们这里还需要安装Erlang

    [ol]

    下载地址:http://www.erlang.org/download/otp_win64_17.3.exe

  3. 安装步骤

    步骤一
  4. 步骤二
  5. 步骤三
  6. 步骤四
  7. 安装完成
  • 现在安装RabbitMQ

      步骤一
    1. 步骤二
    2. 步骤三
    3. 安装完成
  • 启动RabbitMQ管理工具

      开始菜单 — 最新添加 — 展开 — 选中双击

    1. 输入命令:rabbitmq-plugins enable rabbitmq_management

      效果如果图

    2. 在浏览器中输入地址查看:http://127.0.0.1:15672/


      出现次页面代表成功,默认用户和密码都是guest/ guest

    3. 若不出现此页面,就是安装失败了,不要慌,多半问题在系统用户名必须是中文(放心有解决办法):https://blog.csdn.net/weixin_41758046/article/details/89489141

    [/ol]

    SpringBoot整合RabbitMQ

    1.添加依赖

    • pom.xml中添加 spring-boot-starter-amqp的依赖

      <!-- spring-boot-starter-amqp的依赖 -->
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
    • 其他依赖

      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
      
      <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
      </dependency>
      
      <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      <exclusions>
      <exclusion>
      <groupId>org.junit.vintage</groupId>
      <artifactId>junit-vintage-engine</artifactId>
      </exclusion>
      </exclusions>
      </dependency>
      
      <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
      </dependency>
    • application.yml文件中配置rabbitmq相关内容

      spring:
      rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest

    这里我们环境就搭建起来了

    2.具体编码实现

    1. 配置列队

      package com.example.spring_boot_rabbitmq;
      
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.*;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      /**
      * @author:zq
      * @date: Greated in 2019/12/19 11:46
      * 配置队列
      */
      
      @Configuration
      @Slf4j
      public class DelayRabbitConfig {
      
      /**
      * 延迟队列 TTL 名称
      */
      private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
      /**
      * DLX,dead letter发送到的 exchange
      * 延时消息就是发送到该交换机的
      */
      public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
      /**
      * routing key 名称
      * 具体消息发送在该 routingKey 的
      */
      public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
      
      public static final String ORDER_QUEUE_NAME = "user.order.queue";
      public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
      public static final String ORDER_ROUTING_KEY = "order";
      
      /**
      * 延迟队列配置
      * <p>
      * 1、params.put("x-message-ttl", 5 * 1000);
      * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
      * 2、rabbitTemplate.convertAndSend(book, message -> {
      * message.getMessageProperties().setExpiration(2 * 1000 + "");
      * return message;
      * });
      * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
      **/
      @Bean
      public Queue delayOrderQueue() {
      Map<String, Object> params = new HashMap<>();
      // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
      params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
      // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
      params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
      return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
      }
      /**
      * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
      * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,
      * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
      * @return DirectExchange
      */
      @Bean
      public DirectExchange orderDelayExchange() {
      return new DirectExchange(ORDER_DELAY_EXCHANGE);
      }
      @Bean
      public Binding dlxBinding() {
      return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
      }
      
      @Bean
      public Queue orderQueue() {
      return new Queue(ORDER_QUEUE_NAME, true);
      }
      /**
      * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
      * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
      **/
      @Bean
      public TopicExchange orderTopicExchange() {
      return new TopicExchange(ORDER_EXCHANGE_NAME);
      }
      
      @Bean
      public Binding orderBinding() {
      // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
      return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
      }
      
      }
    2. 创建一个Order实体类

      package com.example.spring_boot_rabbitmq.pojo;
      
      import lombok.Data;
      
      import java.io.Serializable;
      
      /**
      * @author:zq
      * @date: Greated in 2019/12/19 11:49
      */
      @Data
      public class Order implements Serializable {
      private static final long serialVersionUID = -2221214252163879885L;
      
      private String orderId; // 订单id
      
      private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消
      
      private String orderName; // 订单名字
      
      }
    3. 接收者

      package com.example.spring_boot_rabbitmq;
      
      import com.example.spring_boot_rabbitmq.pojo.Order;
      import com.rabbitmq.client.Channel;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      import java.util.Date;
      
      /**
      * @author:zq
      * @date: Greated in 2019/12/19 11:53
      * 接收者
      */
      
      @Component
      @Slf4j
      public class DelayReceiver {
      @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
      public void orderDelayQueue(Order order, Message message, Channel channel) {
      log.info("###########################################");
      log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]",  new Date(), order.toString());
      if(order.getOrderStatus() == 0) {
      order.setOrderStatus(2);
      log.info("【该订单未支付,取消订单】" + order.toString());
      } else if(order.getOrderStatus() == 1) {
      log.info("【该订单已完成支付】");
      } else if(order.getOrderStatus() == 2) {
      log.info("【该订单已取消】");
      }
      log.info("###########################################");
      }
      
      }
    4. 发送者

      package com.example.spring_boot_rabbitmq;
      
      import com.example.spring_boot_rabbitmq.pojo.Order;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.AmqpTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      import java.util.Date;
      
      /**
      * @author:zq
      * @date: Greated in 2019/12/19 11:55
      * 发送者
      */
      @Component
      @Slf4j
      public class DelaySender {
      @Autowired
      private AmqpTemplate amqpTemplate;
      
      public void sendDelay(Order order) {
      log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );
      this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
      // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
      message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
      return message;
      });
      }
      
      }
    5. 测试,访问http://localhost:8080/sendDelay查看日志输出

      package com.example.spring_boot_rabbitmq;
      
      import com.example.spring_boot_rabbitmq.pojo.Order;
      import org.springframework.web.bind.annotation.RestController;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      
      /**
      * @author:zq
      * @date: Greated in 2019/12/19 11:57
      * 测试
      */
      
      @RestController
      public class TestController {
      @Autowired
      private DelaySender delaySender;
      
      @GetMapping("/sendDelay")
      public Object sendDelay() {
      Order order1 = new Order();
      order1.setOrderStatus(0);
      order1.setOrderId("123456");
      order1.setOrderName("小米6");
      
      Order order2 = new Order();
      order2.setOrderStatus(1);
      order2.setOrderId("456789");
      order2.setOrderName("小米8");
      
      delaySender.sendDelay(order1);
      delaySender.sendDelay(order2);
      return "ok";
      }
      
      }
    6. 输出

      到此已经SpringBoot使用RabbitMQ延时队列已经完成,希望对你有所帮助,若有地方不理解或者有更好的办法请留言,谢谢。

    • 点赞 9
    • 收藏
    • 分享
    • 文章举报
    一元情书 释 发布了6 篇原创文章 · 获赞 15 · 访问量 2465 私信 关注
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: