您的位置:首页 > 编程语言 > Java开发

SpringBoot | 第三十八章:基于RabbitMQ实现消息延迟队列方案

2019-07-21 22:36 1641 查看

前言

前段时间在编写通用的消息通知服务时,由于需要实现类似通知失败时,需要延后几分钟再次进行发送,进行多次尝试后,进入定时发送机制。此机制,在原先对接银联支付时,银联的异步通知也是类似的,在第一次通知失败后,支付标准服务会重发,最多发送五次,每次的间隔时间为1、4、8、16分钟等。本文就简单讲解下使用RabbitMQ实现延时消息队列功能。

  • SpringBoot集成RabbitMQ实现延时队列实战
  • 一些最佳实践
  • 参考资料
  • 总结
  • 最后
  • 老生常谈
  • 一点知识

    在此之前,简单说明下基于RabbitMQ实现延时队列的相关知识及说明下延时队列的使用场景。

    延时队列使用场景

    在很多的业务场景中,延时队列可以实现很多功能,此类业务中,一般上是非实时的,需要延迟处理的,需要进行重试补偿的。

    1. 订单超时关闭:在支付场景中,一般上订单在创建后30分钟或1小时内未支付的,会自动取消订单。
    2. 短信或者邮件通知:在一些注册或者下单业务时,需要在1分钟或者特定时间后进行短信或者邮件发送相关资料的。本身此类业务于主业务是无关联性的,一般上的做法是进行异步发送。
    3. 重试场景:比如消息通知,在第一次通知出现异常时,会在隔几分钟之后进行再次重试发送。

    RabbitMQ实现延时队列

    本身在

    RabbitMQ
    中是未直接提供延时队列功能的,但可以使用
    TTL(Time-To-Live,存活时间)
    DLX(Dead-Letter-Exchange,死信队列交换机)
    的特性实现延时队列的功能。

    存活时间(Time-To-Live 简称 TTL)

    RabbitMQ
    中可以对队列和消息分别设置TTL,TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后
    死亡
    成为
    Dead Letter
    。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

    死信交换(Dead Letter Exchanges 简称 DLX)

    上个知识点也提到了,设置了

    TTL
    的消息或队列最终会成为
    Dead Letter
    ,当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定此DLX的队列就是死信队列。

    一个消息变成死信一般上是由于以下几种情况;

    1. 消息被拒绝
    2. 消息过期
    3. 队列达到了最大长度。

    所以,通过

    TTL
    DLX
    的特性可以模拟实现延时队列的功能。当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列。故简单来说,我们可以创建2个队列,一个队列用于发送消息,一个队列用于消息过期后的转发的目标队列。

    SpringBoot集成RabbitMQ实现延时队列实战

    以下使用

    SpringBoot
    集成
    RabbitMQ
    进行实战说明,在进行
    http
    消息通知时,若通知失败(地址不可用或者连接超时)时,将此消息转入延时队列中,待特定时间后进行重新发送。

    0.引入pom依赖

    <!-- rabbit -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- 简化http操作 -->
    <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-http</artifactId>
    <version>4.5.16</version>
    </dependency>
    <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-json</artifactId>
    <version>4.5.16</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    1.编写

    rabbitmq
    配置文件(关键配置)
    RabbitConfig.java

    /**
    *
    * @ClassName   类名:RabbitConfig
    * @Description 功能说明:
    * <p>
    * TODO
    *</p>
    ************************************************************************
    * @date        创建日期:2019年7月17日
    * @author      创建人:oKong
    * @version     版本号:V1.0
    *<p>
    ***************************修订记录*************************************
    *
    *   2019年7月17日   oKong   创建该类功能。
    *
    ***********************************************************************
    *</p>
    */
    @Configuration
    public class RabbitConfig {
    
    @Autowired
    ConnectionFactory connectionFactory;
    
    /**
    * 消费者线程数 设置大点 大概率是能通知到的
    */
    @Value("${http.notify.concurrency:50}")
    int concurrency;
    
    /**
    * 延迟队列的消费者线程数 可设置小点
    */
    @Value("${http.notify.delay.concurrency:20}")
    int delayConcurrency;
    
    @Bean
    public RabbitAdmin rabbitAdmin() {
    return new RabbitAdmin(connectionFactory);
    }
    
    @Bean
    public DirectExchange httpMessageNotifyDirectExchange(RabbitAdmin rabbitAdmin) {
    //durable 是否持久化
    //autoDelete 是否自动删除,即服务端或者客服端下线后 交换机自动删除
    DirectExchange directExchange = new DirectExchange(ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false);
    directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
    return directExchange;
    }
    
    //设置消息队列
    @Bean
    public Queue httpMessageStartQueue(RabbitAdmin rabbitAdmin) {
    
    /*
    创建接收队列,4个参数
    name - 队列名称
    durable - false,不进行持有化
    exclusive - true,独占性
    autoDelete - true,自动删除*/
    Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false);
    queue.setAdminsThatShouldDeclare(rabbitAdmin);
    return queue;
    }
    
    //队列绑定交换机
    @Bean
    public Binding bindingStartQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) {
    Binding binding = BindingBuilder.bind(httpMessageStartQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_START_RK);
    binding.setAdminsThatShouldDeclare(rabbitAdmin);
    return binding;
    }
    
    @Bean
    public Queue httpMessageOneQueue(RabbitAdmin rabbitAdmin) {
    Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false);
    queue.setAdminsThatShouldDeclare(rabbitAdmin);
    return queue;
    }
    
    @Bean
    public Binding bindingOneQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) {
    Binding binding = BindingBuilder.bind(httpMessageOneQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_ONE_RK);
    binding.setAdminsThatShouldDeclare(rabbitAdmin);
    return binding;
    }
    
    //-------------设置延迟队列--开始--------------------
    @Bean
    public Queue httpDelayOneQueue() {
    //name - 队列名称
    //durable - true
    //exclusive - false
    //autoDelete - false
    return QueueBuilder.durable("http.message.dlx.one")
    //以下是重点:当变成死信队列时,会转发至 路由为x-dead-letter-exchange及x-dead-letter-routing-key的队列中
    .withArgument("x-dead-letter-exchange", ApplicationConstant.HTTP_MESSAGE_EXCHANGE)
    .withArgument("x-dead-letter-routing-key", ApplicationConstant.HTTP_MESSAGE_ONE_RK)
    .withArgument("x-message-ttl", 1*60*1000)//1分钟 过期时间(单位:毫秒),当过期后 会变成死信队列,之后进行转发
    .build();
    }
    //绑定到交换机上
    @Bean
    public Binding bindingDelayOneQuene(RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) {
    Binding binding = BindingBuilder.bind(httpDelayOneQueue).to(httpMessageNotifyDirectExchange).with("delay.one");
    binding.setAdminsThatShouldDeclare(rabbitAdmin);
    return binding;
    }
    //-------------设置延迟队列--结束--------------------
    
    //建议将正常的队列和延迟处理的队列分开
    //设置监听容器
    @Bean("notifyListenerContainer")
    public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ack
    factory.setConnectionFactory(connectionFactory);
    factory.setPrefetchCount(1);
    factory.setConcurrentConsumers(concurrency);
    return factory;
    }
    
    // 设置监听容器
    @Bean("delayNotifyListenerContainer")
    public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ack
    factory.setConnectionFactory(connectionFactory);
    factory.setPrefetchCount(1);
    factory.setConcurrentConsumers(delayConcurrency);
    return factory;
    }
    }

    ApplicationConstant.java

    public class ApplicationConstant {
    
    /**
    * 发送http通知的 exchange 队列
    */
    public static final String HTTP_MESSAGE_EXCHANGE = "http.message.exchange";
    
    /**
    * 配置消息队列和路由key值
    */
    public static final String HTTP_MESSAGE_START_QUEUE_NAME = "http.message.start";
    public static final String HTTP_MESSAGE_START_RK = "rk.start";
    
    public static final String HTTP_MESSAGE_ONE_QUEUE_NAME = "http.message.one";
    public static final String HTTP_MESSAGE_ONE_RK = "rk.one";
    
    /**
    * 通知队列对应的延迟队列关系,即过期队列之后发送到下一个的队列信息,可以根据实际情况添加,当然也可以根据一定规则自动生成
    */
    public static final Map<String,String> delayRefMap = new HashMap<String, String>() {
    /**
    *
    */
    private static final long serialVersionUID = -779823216035682493L;
    
    {
    put(HTTP_MESSAGE_START_QUEUE_NAME, "delay.one");
    }
    };
    }

    简单来说,就是创建一个正常消息发送队列,用于接收http消息请求的参数,同时进行http请求。同时,创建一个延时队列,设置其

    x-dead-letter-exchange
    x-dead-letter-routing-key
    x-message-ttl
    值,将其转发到正常的队列中。使用一个map对象维护一个关系,当正常消息异常时,需要发送的延时队列的队列名称,当然时间场景汇总,根据需要可以进行动态配置或者根据一定规则进行动态映射。

    2.创建监听类,用于消息的消费操作,此处使用

    @RabbitListener
    来消费消息(当然也可以使用
    SimpleMessageListenerContainer
    进行消息配置的),创建了一个正常消息监听和延时队列监听,由于一般上异常通知是低概率事件,可根据不同的监听容器进行差异化配置。

    /**
    *
    * @ClassName   类名:HttpMessagerLister
    * @Description 功能说明:http通知消费监听接口
    * <p>
    * TODO
    *</p>
    ************************************************************************
    * @date        创建日期:2019年7月17日
    * @author      创建人:oKong
    * @version     版本号:V1.0
    *<p>
    ***************************修订记录*************************************
    *
    *   2019年7月17日   oKong   创建该类功能。
    *
    ***********************************************************************
    *</p>
    */
    @Component
    @Slf4j
    public class HttpMessagerLister {
    
    @Autowired
    HttpMessagerService messagerService;
    
    @RabbitListener(id = "httpMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory = "notifyListenerContainer")
    public void httpMessageNotifyConsumer(Message message, Channel channel) throws Exception {
    doHandler(message, channel);
    }
    
    @RabbitListener(id= "httpDelayMessageNotifyConsumer", queues = {
    ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory = "delayNotifyListenerContainer")
    public void httpDelayMessageNotifyConsumer(Message message, Channel channel) throws Exception {
    doHandler(message, channel);
    }
    
    private void doHandler(Message message, Channel channel) throws Exception {
    String body = new String(message.getBody(),"utf-8");
    String queue = message.getMessageProperties().getConsumerQueue();
    log.info("接收到通知请求:{},队列名:{}",body, queue);
    //消息对象转换
    try {
    HttpEntity httpNotifyDto = JSONUtil.toBean(body, HttpEntity.class);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    //发送通知
    messagerService.notify(queue, httpNotifyDto);
    } catch(Exception e) {
    log.error(e.getMessage());
    //ack
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
    }
    }

    HttpMessagerService.java
    :消息真正处理的类,此类是关键,这里未进行日志记录,真实场景中,强烈建议进行消息通知的日志存储,防止日后信息的查看,同时也能通过发送状态,在重试次数都失败后,进行定时再次发送功能,同时也有据可查。

    @Component
    @Slf4j
    public class HttpMessagerService {
    
    @Autowired
    AmqpTemplate mqTemplate;
    
    public void notify(String queue,HttpEntity httpEntity) {
    //发起请求
    log.info("开始发起http请求:{}", httpEntity);
    try {
    switch(httpEntity.getMethod().toLowerCase()) {
    case "POST":
    HttpUtil.post(httpEntity.getUrl(), httpEntity.getParams());
    break;
    case "GET":
    default:
    HttpUtil.get(httpEntity.getUrl(), httpEntity.getParams());
    }
    } catch (Exception e) {
    //发生异常,放入延迟队列中
    String nextRk = ApplicationConstant.delayRefMap.get(queue);
    if(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals(queue)) {
    //若已经是最后一个延迟队列的消息队列了,则后续可直接放入数据库中 待后续定时策略进行再次发送
    log.warn("http通知已经通知N次失败,进入定时进行发起通知,url={}", httpEntity.getUrl());
    } else {
    log.warn("http重新发送通知:{}, 通知队列rk为:{}, 原队列:{}", httpEntity.getUrl(), nextRk, queue);
    mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));
    }
    }
    }
    }

    3.创建控制层服务(真实场景中,如

    SpringCloud
    微服务中,一般上是创建个api接口,供其他服务进行调用)

    @Slf4j
    @RestController
    @Api(tags = "http测试接口")
    public class HttpDemoController {
    
    @Autowired
    AmqpTemplate mqTemplate;
    
    @PostMapping("/send")
    @ApiOperation(value="send",notes = "发送http测试")
    public String sendHttp(@RequestBody HttpEntity httpEntity) {
    //发送http请求
    log.info("开始发起http请求,发布异步消息:{}", httpEntity);
    mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));
    return "发送成功:url=" + httpEntity.getUrl();
    }
    }

    4.配置文件添加

    RabbitMQ
    相关配置信息

    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    
    # 通知-消费者线程数 设置大点 大概率是能通知到的
    http.notify.concurrency=150
    # 延迟队列的消费者线程数 可设置小点
    http.notify.delay.concurrency=10

    5.编写启动类。

    @SpringBootApplication
    @Slf4j
    public class DelayQueueApplication {
    
    public static void main(String[] args) throws Exception {
    SpringApplication.run(DelayQueueApplication.class, args);
    log.info("spring-boot-rabbitmq-delay-queue-chapter38服务启动!");
    }
    }

    6.启动服务。使用

    swagger
    进行简单调用测试。

    • 正常通知:

    2019-07-20 23:52:23.792  INFO 65216 --- [nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController  : 开始发起http请求,发布异步消息:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
    2019-07-20 23:52:23.794  INFO 65216 --- [TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister  : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com"},队列名:http.message.start
    2019-07-20 23:52:23.794  INFO 65216 --- [TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService    : 开始发起http请求:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
    • 异常通知:访问一个不存在的地址

    2019-07-20 23:53:14.699  INFO 65216 --- [nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController  : 开始发起http请求,发布异步消息:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
    2019-07-20 23:53:14.705  INFO 65216 --- [TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister  : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},队列名:http.message.start
    2019-07-20 23:53:14.705  INFO 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService    : 开始发起http请求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
    2019-07-20 23:53:14.706  WARN 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService    : http重新发送通知:www.baidu.com1, 通知队列rk为:delay.one, 原队列:http.message.start

    RabbitMQ
    后台中,可以看见
    http.message.dlx.one
    队列中存在这需要延时处理的消息,在一分钟后会转发至
    http.message.one
    队列中。

    在一分钟后,可以看见消息本再次消费了。

    2019-07-20 23:54:14.722  INFO 65216 --- [TaskExecutor-16] c.l.l.s.chapter38.mq.HttpMessagerLister  : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},队列名:http.message.one
    2019-07-20 23:54:14.723  INFO 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService    : 开始发起http请求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
    2019-07-20 23:54:14.723  WARN 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService    : http通知已经通知N次失败,进入定时进行发起通知,url=www.baidu.com1

    一些最佳实践

    在正式场景中,一般上补偿或者重试机制大概率是不会发送的,倘若发生时,一般上是第三方业务系统出现了问题,故一般上在进行补充时,应该在非高峰期进行操作,故应该对延时监听器,应该在高峰期时停止消费,在非高峰期时进行消费。同时,还可以根据不同的通知类型,放入不一样的延时队列中,保障业务的正常。这里简单说明下,动态停止或者启动演示监听器的方式。一般上是使用

    RabbitListenerEndpointRegistry
    对象获取延时监听器,之后进行动态停止或者启用。可设置
    @RabbitListener
    的id属性,直接进行获取,当然也可以直接获取所有的监听器,进行自定义判断了。

    @Autowired
    RabbitListenerEndpointRegistry registry;
    
    @GetMapping("/set")
    @ApiOperation(value = "set", notes = "设置消息监听器的状态")
    public String setSimpleMessageListenerContainer(String status) {
    if("1".equals(status)) {
    registry.getListenerContainer("httpDelayMessageNotifyConsumer").start();
    } else {
    registry.getListenerContainer("httpDelayMessageNotifyConsumer").stop();
    }
    return status;
    }

    这里,只是简单进行演示说明,在真实场景下,可以使用定时器,判断当前是否为高峰期,进而进行动态设置监听器的状态。

    参考资料

    1. https://www.rabbitmq.com/admin-guide.html
    2. https://www.rabbitmq.com/ttl.html

    总结

    本文主要简单介绍了基于

    RabbitMQ
    实现延时队列的功能。对于需要实现更加灵活的配置及功能时,如可自定义配置通知次数等,大家可根据自己的需求进行添加,可以使用动态创建队列的方式。当然使用延时队列的方式还有很多,比如可以使用
    redis
    的key值过期回调机制使用,也可以使用定时机制。另,发现好久没有写文章了,感觉写的有点乱,还望见谅呀~

    最后

    目前互联网上很多大佬都有

    SpringBoot
    系列教程,如有雷同,请多多包涵了。原创不易,码字不易,还希望大家多多支持。若文中有所错误之处,还望提出,谢谢。

    老生常谈

    • 个人QQ:
      499452441
    • 微信公众号:
      lqdevOps

    个人博客:http://blog.lqdev.cn
    完整示例:基于RabbitMQ实现消息延迟队列方案
    原文地址:https://blog.lqdev.cn/2019/07/21/springboot/chapter-thirty-eight/

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: