您的位置:首页 > 其它

RabbitMQ-TTL-死信队列_DLX

2021-09-16 00:06 671 查看

1. 简介

死信队列,简称:

DLX
Dead Letter Exchange
(死信交换机),当消息成为
Dead message
后,可以被重新发送到另外一个交换机,这个交换机就是
DLX

(一般会将DLX和与其binding 的 Queue,一并称为死信队列或DLX,习惯而已,不必纠结)

那么什么情况下会成为

Dead message

  1. 队列的长度达到阈值。
  2. 消费者拒接消费消息,
    basicNack/basicReject
    ,并且不把消息重新放入原目标队列,
    requeue=false
  3. 原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

  1. Producer
    发送一条消息到
    Exchange
    并路由到设有过期时间(假设30分钟)的
    Queue
    中。
  2. 当消息的存活时间超过了30分钟后,
    Queue
    会将消息转发给
    DLX
  3. DLX
    接收到
    Dead message
    后,将
    Dead message
    路由到与其绑定的
    Queue
    中。
  4. 此时消费者监听此死信队列并消费此消息。

死信队列有什么用呢?

  1. 取消订单(比如下单30分钟后未付款,则取消订单,回滚库存),或者新用户注册,隔段时间进行短信问候等。
  2. 将消费者拒绝的消息发送到死信队列,然后将消息进行持久化,后续可以做业务分析或者处理。

2. TTL

因为要实现延迟消息,我们先得知道如何设置过期时间。这里指演示

TTL
Time To Live
(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,所以当所有消息的过期时间一致时(比如30m后过期),最好给队列设置过期时间,而不是消息。但是有的情况确实每个消息的过期时间不一致,比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知,这就有问题了,不可能设置那么多的队列,这时就需要使用

    延迟队列
    来实现这个功能(下篇博文会讲到)。

  • 如果两者都进行了设置,以时间短的为准。

2.1 队列设置TTL

2.1.1 引入所需依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

2.1.2 application.yaml

spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123

2.1.3 RabbitConfig

  1. 声明一个过期时间为30s的
    Queue
  2. 声明一个交换机(这里声明的是主题交换机,交换机类型无所谓,只要消息能路由到
    Queue
    即可)。
  3. 设置绑定关系。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 设置过期队列
*
* @author ludangxin
* @date 2021/9/15
*/
@Configuration
public class RabbitTtlConfig {
public static final String EXCHANGE_NAME = "TTL_EXCHANGE";
public static final String QUEUE_NAME = "TTL_QUEUE";

@Bean(QUEUE_NAME)
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).ttl(30000).build();
}

@Bean(EXCHANGE_NAME)
public Exchange exchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

@Bean
public Binding binding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
}
}

2.1.4 Producer

import com.ldx.rabbitmq.config.RabbitTtlConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 具有过期时间的消息 生产者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class TtlProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg() {
rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", "这是一条有生命周期的消息。");
}
}

2.1.5 测试代码

@Autowired
private TtlProducer ttlProducer;

@Test
public void sendMsg() {
ttlProducer.sendMsg();
}

2.1.6 启动测试

运行测试代码后,到RabbitMQ 控制台中查看队列即消息情况。

如图所示,消息存活30s未被消费后,消息被遗弃。

2.2 消息设置TTL

2.2.1 Producer

我们将Producer代码稍加修改,给消息设置10s的过期时间,观察消息到底是存活30s还是10s。

import com.ldx.rabbitmq.config.RabbitTtlConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 具有过期时间的消息 生产者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class TtlProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg() {
MessageProperties mp = new MessageProperties();
mp.setExpiration("10000");
Message message = new Message("这是一条有生命周期的消息。".getBytes(), mp);
rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", message);
}
}

2.2.2 启动测试

如图所示,消息只存活了10s。

我们将过期时间设置成40s后,但消息还是只存活了30s。说明当同时设置了过期时间时,是以时间短的为准

3. TTL + DLX

接下来我们通过设置过期时间和死信队列来实现

延迟队列
的功能。

首先罗列下实现步骤:

  1. 声明一个
    Exchange
    TTl Queue
    ,并且绑定关系,实现生成死信的逻辑。
  2. 声明一个
    DLX
    Queue
    ,此步骤的
    Queue
    是为了接收死信并让
    Consumer
    进行监听消费的。
  3. TTl Queue
    DLX
    进行绑定,使消息成为死信后能转发给
    DLX

3.1 RabbitConfig

其实DLX与普通的Exchange没有什么区别,只不过是“生产”死信的

Queue
指定了消息成为死信后转发到DLX。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 死信队列配置
*
* @author ludangxin
* @date 2021/9/15
*/
@Configuration
public class RabbitDeadLetterConfig {

public static final String QUEUE_NAME_TTL = "QUEUE_NAME_TTL_1";
public static final String EXCHANGE_NAME_TTL = "EXCHANGE_NAME_TTL_1";
public static final String QUEUE_NAME_DEAD_LETTER = "QUEUE_NAME_DEAD_LETTER";
public static final String EXCHANGE_NAME_DLX = "EXCHANGE_NAME_DLX";
public static final String ROUTING_KEY_DLX = "EXPIRE.#";
public static final String ROUTING_KEY_DEAD_LETTER = "EXPIRE.10";
public static final String ROUTING_KEY_TTL = "EXPIRE_TTL_10";

/**
* 1. Queue 队列
*/
@Bean(QUEUE_NAME_TTL)
public Queue queue() {
/*
* 1. 设置队列的过期时间 30s
* 2. 绑定DLX
* 3. 设置routing key(注意:这里设置的是路由到死信Queue的路由,并不是设置binding关系的路由)
*/
return QueueBuilder.durable(QUEUE_NAME_TTL).ttl(10000).deadLetterExchange(EXCHANGE_NAME_DLX).deadLetterRoutingKey(ROUTING_KEY_DEAD_LETTER).build();
}

/**
* 2. exchange
*/
@Bean(EXCHANGE_NAME_TTL)
public Exchange exchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME_TTL).durable(true).build();
}

/**
* 3. 队列和交互机绑定关系 Binding
*/
@Bean
public Binding bindExchange(@Qualifier(QUEUE_NAME_TTL) Queue queue, @Qualifier(EXCHANGE_NAME_TTL) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_TTL).noargs();
}

/**
* 4. 死信队列
*/
@Bean(QUEUE_NAME_DEAD_LETTER)
public Queue deadLetterQueue() {
return QueueBuilder.durable(QUEUE_NAME_DEAD_LETTER).build();
}

/**
* 5. dlx
*/
@Bean(EXCHANGE_NAME_DLX)
public Exchange exchangeDlx() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
}

/**
* 6. 队列和交互机绑定关系 Binding
*/
@Bean
public Binding bindDlxExchange(@Qualifier(QUEUE_NAME_DEAD_LETTER) Queue queue, @Qualifier(EXCHANGE_NAME_DLX) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DLX).noargs();
}

}

3.2 Producer

import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* 延迟消息生产者
*
* @author ludangxin
* @date 2021/9/9
*/
@Component
public class DelayProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg() {
String msg = "这是一条有生命周期的消息,发送时间为:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Message message = new Message(msg.getBytes());
rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.EXCHANGE_NAME_TTL, RabbitDeadLetterConfig.ROUTING_KEY_TTL, message);
}
}

3.3 Consumer

import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* 延迟消息消费者
*
* @author ludangxin
* @date 2021/9/9
*/
@Slf4j
@Component
public class DelayConsumer {

@RabbitListener(queues = {RabbitDeadLetterConfig.QUEUE_NAME_DEAD_LETTER})
public void dlxQueue(Message message){
log.info(new String(message.getBody()) + ",消息接收时间为:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
}

3.4 测试代码

@Autowired
private DelayProducer delayProducer;

@Test
@SneakyThrows
public void sendDlxMsg() {
delayProducer.sendMsg();
// 使进程阻塞,方便Consumer监听输出Message
System.in.read();
}

3.5 启动测试

输出日志内容如下:

2021-09-15 23:51:22.795  INFO 8122 --- [ntContainer#0-1] com.ldx.rabbitmq.consumer.DelayConsumer  : 这是一条有生命周期的消息,发送时间为:2021-09-15 23:51:12,消息接收时间为:2021-09-15 23:51:22
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: