您的位置:首页 > 编程语言 > Java开发

Springboot集成rabbitmq实现延时队列

2020-01-15 11:00 477 查看

Springboot集成rabbitmq实现延时队列

  • RabbitMQ-延时任务
  • RabbitMQ TTL和DXL
  • 延时队列实现方案:
  • 什么是延时队列?

    列举几个使用场景:

    1. 定时发公告;
    2. 用户下单30分钟后未付款自动关闭订单;
    3. 用户下单后延时短信提醒;
    4. 延时关闭空闲客户端连接;
    常见的种类有:
    1. 任务队列
    2. 消息队列
    3. 请求队列
    延时任务-实现方式:
    • 定期轮询(数据库等)
    • DelayQueue
    • Timer
    • ScheduledExecutorService
    • 时间轮(kafka)
    • RabbitMQ
    • Quartz
    • Redis Zset
    • Koala
    • JCronTab
    • SchedulerX(阿里)
    • 有赞延迟队列
    详细信息:https://www.cnblogs.com/JonaLin/p/11550427.html.

    RabbitMQ-延时任务

    RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
    RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
    A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
    B: 对消息进行单独设置,每条消息TTL可以不同。

    Rabbitmq实现延时队列一般而言有两种形式:
    第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
    第二种方式:利用rabbitmq中的插件x-delay-message

    RabbitMQ TTL和DXL

    AMQP和RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
    但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:

    • Time To Live(TTL).
      RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
      RabbitMQ针对队列中的消息过期时间有两种方法可以设置。 A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
    • B: 对消息进行单独设置,每条消息TTL可以不同。
      如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter
  • Dead Letter Exchanges(DLX).
    RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由
      x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
    • x-dead-letter-routing-key:指定routing-key发送
    • 队列出现dead letter的情况有: 消息或者队列的TTL过期
    • 队列达到最大长度
    • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

    利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

    延时队列实现方案:

    在 pom.xml 中添加 spring-boot-starter-amqp的依赖

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    在 application.yml文件中配置rabbitmq相关内容

    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

    消息实体:

    import lombok.Data;
    
    import java.io.Serializable;
    
    @Data
    public class Order implements Serializable {
    private static final long serialVersionUID = -2221214252163879885L;
    
    private String orderId; // 订单id
    
    private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消
    
    private String orderName; // 订单名字
    
    }

    配置队列:

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @Slf4j
    public class DelayRabbitConfig {
    /**
    * 延迟队列 TTL 名称
    */
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    /**
    * DLX,dead letter发送到的 exchange
    * 延时消息就是发送到该交换机的
    */
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    /**
    * routing key 名称
    * 具体消息发送在该 routingKey 的
    */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
    
    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";
    
    /**
    * 延迟队列配置
    * <p>
    * 1、params.put("x-message-ttl", 5 * 1000);
    * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
    * 2、rabbitTemplate.convertAndSend(book, message -> {
    * message.getMessageProperties().setExpiration(2 * 1000 + "");
    * return message;
    * });
    * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
    **/
    @Bean
    public Queue delayOrderQueue() {
    Map<String, Object> params = new HashMap<>();
    // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
    params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
    // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
    params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
    return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
    }
    /**
    * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
    * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,
    * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
    * @return DirectExchange
    */
    @Bean
    public DirectExchange orderDelayExchange() {
    return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    @Bean
    public Binding dlxBinding() {
    return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }
    
    @Bean
    public Queue orderQueue() {
    return new Queue(ORDER_QUEUE_NAME, true);
    }
    /**
    * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
    * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
    **/
    @Bean
    public TopicExchange orderTopicExchange() {
    return new TopicExchange(ORDER_EXCHANGE_NAME);
    }
    
    @Bean
    public Binding orderBinding() {
    // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
    return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }
    
    }

    生产者生产消息:

    import com.example.ktoa.pojo.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    @Component
    @Slf4j
    public class DelaySender {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    public void sendDelay(Order order) {
    log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );
    this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
    // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
    message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
    return message;
    });
    }
    
    }

    消费者消费消息:

    @Component
    @Slf4j
    public class DelayReceiver {
    @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
    public void orderDelayQueue(Order order, Message message, Channel channel) {
    log.info("###########################################");
    log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]",  new Date(), order.toString());
    if(order.getOrderStatus() == 0) {
    order.setOrderStatus(2);
    log.info("【该订单未支付,取消订单】" + order.toString());
    } else if(order.getOrderStatus() == 1) {
    log.info("【该订单已完成支付】");
    } else if(order.getOrderStatus() == 2) {
    log.info("【该订单已取消】");
    }
    log.info("###########################################");
    }
    
    }

    Controller层(访问http://localhost:8080/请求路径)

    import com.example.ktoa.config.DelaySender;
    import com.example.ktoa.pojo.Order;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class TestController {
    @Autowired
    private DelaySender delaySender;
    
    @GetMapping("/请求路径")
    public Object sendDelay() {
    Order order1 = new Order();
    order1.setOrderStatus(0);
    order1.setOrderId("123456");
    order1.setOrderName("小米6");
    
    Order order2 = new Order();
    order2.setOrderStatus(1);
    order2.setOrderId("456789");
    order2.setOrderName("小米8");
    
    delaySender.sendDelay(order1);
    delaySender.sendDelay(order2);
    return "ok";
    }
    
    }
    源码地址:https://github.com/923226145/SpringBoot_RabbitMQ 延时任务队列的原理与实现总结:https://www.jianshu.com/p/a8c1458998aa
    • 点赞
    • 收藏
    • 分享
    • 文章举报
    BlondeKid 发布了3 篇原创文章 · 获赞 1 · 访问量 59 私信 关注
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: