您的位置:首页 > 其它

RabbitMQ-如何保证消息在99.99%的情况下不丢失

2021-09-12 12:47 106 查看

1. 简介

MQ虽然帮我们解决了很多问题,但是也带来了很多问题,其中最麻烦的就是:如何保证消息的可靠性传输

我们在聊如何保证消息的可靠性传输之前,先考虑下哪些情况下会出现消息丢失的情况。

首先,上图中完整的展示了消息从生产到被消费的完整链路,我们通过图列举下各种情况。

  1. Producer
    在把
    Message
    发送到
    Broker
    的过程中,因为网络不可靠的原因,可能会出现
    Message
    还未发送到
    Broker
    就丢失,或者
    Message
    发送到了
    Broker
    ,但是由于某种原因,消息未保存到Broker。
  2. Broker
    接收到
    Message
    数据存储在内存,
    Consumer
    还没消费,
    Broker
    宕机了。
  3. Consumer
    接收到了
    Message
    Message
    相关业务还没来得及处理,程序报错或者宕机了,
    Broker
    会认为
    Consunmer
    消息正常消费了,就把当前消息从队列中移除了。这种情况也算是消息丢失。

从上述的问题中我们可以总结出想要消息被正常消费,就得保证:

  1. 消息成功被
    Broker
    接收到。
  2. 消息可以被
    Broker
    持久化。
  3. 消息成功被
    Consumer
    接收并且当消费失败时,消息可以重回队列。
  4. 要有相应的补偿机制。(当任何一个环节出错时,可以进行消息 补偿)。

2. 消息的可靠投递

我们在使用MQ的时候,为了避免消息丢失或者投递失败。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性。

  1. confirm 确认模式
  2. return 退回模式

如图所示:

消息从 producer 到 exchange 则会返回一个confirmCallback 。 消息从 exchange 到 queue 投递失败则会返回一个 ReturnsCallback 信息,其内容为ReturnedMessage实例信息。 我们将利用这两个 callback 控制消息的可靠性投递。

2.1 confirm

2.1.1 引入所需依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

2.1.2 application.yaml

spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123
# 开启消息发送确认功能
publisher-confirm-type: correlated
# 高版本已弃用
#    publisher-confirms: true

2.1.3 ConfirmCallBack

package com.ldx.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
* 生产者消息确认回调方法
*
* @author ludangxin
* @date 2021/9/11
*/
@Slf4j
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

/**
*
* @param correlationData 相关配置信息
* @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("MsgSendConfirmCallBack , 回调id: {}", correlationData);
if(ack) {
log.info("消息发送成功");
}else {
log.info("消息发送失败: {}", cause);
}
}
}

2.1.3 RabbitConfig

package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig implements InitializingBean {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 设置一个简单的队列
*/
@Bean(name = "durableQueue")
public Queue queue() {
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列
* 参数5:队列其它参数
*/
return new Queue("helloRabbitMQ", true, false, false, null);
}

/**
* bean 初始化后执行
*/
@Override
public void afterPropertiesSet() {
// 设置消息确认回调类
rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
}
}

2.1.4 测试方法

这里两个测试方法,

sentMsg()
使用默认的
Exchange
,而
sentMsg2()
设置一个不存在的
Exchange
测试失败情况。

package com.ldx.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;

@SpringBootTest
public class ProducerTest {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sentMsg(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend("", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
}

@Test
public void sentMsg2(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
// 设置一个不存在的exchange 测试失败情况
rabbitTemplate.convertAndSend("abc", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
}
}

2.1.5 启动测试

sendMsg()
方法日志如下:

2021-09-11 21:30:38.336  INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=8e9fc4b8-aa32-4e1b-a165-8a83457636ed]
2021-09-11 21:30:38.339  INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功

sendMsg2()
方法日志如下:

2021-09-11 21:32:27.377  INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=399c8d85-f010-433f-946c-419d9b9396c2]
2021-09-11 21:32:27.379  INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)

2.1.6 小结

  1. Confirm 确认模式
    是从
    Producer
    Exchange
  2. Producer
    发送的消息正常或失败时都会进入
    Confirm Callback
    方法。
  3. Producer
    发送消息的
    Exchange
    不存在时,
    Confirm Callback
    中的
    Ack
    为false且
    Cause
    为发送失败原因。

2.2 return

2.2.1 application.yaml

spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123
# 开启消息发送确认功能
publisher-confirm-type: correlated
# 高版本已弃用
#    publisher-confirms: true# 开启失败退回功能
publisher-returns: true

2.2.2 ReturnCallback

这里注意下,网上很多提到的

ReturnCallback
(少了个s)接口已经弃用,注释中也提到了,弃用是为了更好的使用
ReturnedMessage
类,因为对象的方式可以更好的支持
lambda
表达式。

package com.ldx.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
* 发生异常时的消息返回提醒
*
* @author ludangxin
* @date 2021/9/11
*/
@Slf4j
public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {

/**
* Returned message callback.
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息主体: {}", returned.getMessage());
log.info("回复编码: {}", returned.getReplyCode());
log.info("回复内容: {}", returned.getReplyText());
log.info("交换器: {}", returned.getExchange());
log.info("路由键: {}", returned.getRoutingKey());
}
}

2.2.3 RabbitConfig

RabbitReturnCallback
设置到
RabbitTemplate
中。

/**
* bean 初始化后执行
*/
@Override
public void afterPropertiesSet() {
// 设置消息确认回调类
rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
// 设置消息回退回调类
rabbitTemplate.setReturnsCallback(new RabbitReturnCallback());
}

2.2.4 测试方法

@Test
public void sentMsg3(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
// 设置一个不存在的routingkey 测试失败情况
rabbitTemplate.convertAndSend("", "helloRabbitMQ1", "Hello RabbitMQ ~ ", correlationId);
}

2.2.5 启动测试

# sentMsg()
2021-09-11 22:12:24.079  INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=fb471c69-6c7b-48bc-89aa-ae70ac1ed6f8]
2021-09-11 22:12:24.081  INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
# sentMsg2()
2021-09-11 22:13:42.910  INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=0e3211ee-a1ba-45e4-90f6-296be79def07]
2021-09-11 22:13:42.912  INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)
# sentMsg3()
2021-09-11 22:14:23.600  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 消息主体: (Body:'Hello RabbitMQ ~ ' MessageProperties [headers={spring_returned_message_correlation=0a8db922-ff7c-4b13-86a3-04957a7359bc}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-09-11 22:14:23.602  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回复编码: 312
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回复内容: NO_ROUTE
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 交换器:
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 路由键: helloRabbitMQ1
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=0a8db922-ff7c-4b13-86a3-04957a7359bc]
2021-09-11 22:14:23.603  INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功

2.2.6 小节

  1. Return 退回模式
    是从
    Exchange
    Queue
  2. Return
    给了
    Producer
  3. Producer
    发送的消息即使
    Routing Key
    不正确,当
    Exchange
    接收失败后直接触发
    Confirm Callback
    ,不会进入到
    Return Callback
    ,因为还没到
    Exchange
  4. Exchange
    正确接收消息,但是
    Routing Key
    设置错误, 触发
    Return Callback
    方法。

3. 消息的可靠消费

上文中我们提到了一种消息丢失的情况,即

Consumer
接收到了
Message
Message
相关业务还没来得及处理,程序报错或者宕机了,
Broker
会认为
Consunmer
消息正常消费了,就把当前消息从队列中移除了。这种情况也算是消息丢失。

那能不能消息消费成功后再将消息从queue中移除呢?

答案肯定是可以的。

3.1 ACK确认机制

ACK指Acknowledge,确认。 表示消费端收到消息后的确认方式。

  1. 作用:
  • 确认消息是否被消费者消费,消息通过ACK机制确认是否被正确接收,每个消息都要被确认。
  • 默认情况下,一个消息被消费者正确消费就会从队列中移除
  1. ACK确认模式
  • AcknowledgeMode.NONE :不确认
      默认所有消息消费成功,会不断的向消费者推送消息。
    1. 因为RabbitMQ认为所有推送的消息已被成功消费,所以推送出去的消息不会暂存在
      broker
      ,消息存在丢失的危险。
  • AcknowledgeMode.AUTO:自动确认
      由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到
      broker
    1. 使用自动确认模式时,需要考虑的另一件事是消费者过载,因为
      broker
      会暂存没有收到
      ack
      的消息,等消费端
      ack
      后才会丢掉;如果收到消费端的
      nack
      (消费失败的标识)或
      connection
      断开没收到反馈,会将消息放回到原队列头部,导致消费者反复的在消费这条消息。
  • AcknowledgeMode.MANUAL:手动确认
      手动确认则当消费者调用
      ack
      nack
      reject
      几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者。
    1. 手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量。也就是
      Consumer
      一次可以从
      Broker
      取几条消息。
    2. 如果忘记进行ACK确认 忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。只要程序还在运行,没确认的消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。更厉害的是,RabbitMQ 消息消费并没有超时机制,也就是说,程序不重启,消息就永远是 Unacked 状态。处理运维事件时不要忘了这些 Unacked 状态的消息。当程序关闭时(实际只要 消费者 关闭就行),消息会恢复为 Ready 状态。

3.2 配置application.yaml

spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123
listener:
simple:
# manual 手动确认
acknowledge-mode: manual

3.3 Consumer

package com.ldx.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
* 消费者
*
* @author ludangxin
* @date 2021/9/12
*/
@Slf4j
@Component
public class RabbitMQListener {

@RabbitListener(queues = "helloRabbitMQ")
public void helloRabbitMq(Message message, Channel channel) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
log.info(messageProperties.toString());
try {
log.info(message.toString());
log.info(new String(message.getBody()));
int a = 1/0;
channel.basicAck(messageProperties.getDeliveryTag(), false);
} catch (Exception e) {
// 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
if (messageProperties.getRedelivered()) {
log.info("消息已重复处理失败,拒绝再次接收...");
// 拒绝消息
channel.basicReject(messageProperties.getDeliveryTag(), false);
} else {
log.info("消息即将再次返回队列处理...");
channel.basicNack(messageProperties.getDeliveryTag(), false, true);
}
}
}
}

消费消息有三种回执方法,接下来先看下每个方法参数的含义。

3.3.1 basicAck

/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag
:消息投递的标签号,每次消费消息或者消息重新投递后,
deliveryTag
都会增加。手动消息确认模式下,我们可以对指定
deliveryTag
的消息进行
ack
nack
reject
等操作。

multiple
:是否批量确认,值为
true
则会一次性
ack
所有小于当前消息
deliveryTag
的消息。

举个栗子: 假设我先发送三条消息

deliveryTag
分别是5、6、7,可它们都没有被确认,当我发第四条消息此时
deliveryTag
为8,
multiple
设置为 true,会将5、6、7、8的消息全部进行确认。

3.3.2 basicNack

/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;

deliveryTag
:表示消息投递序号。

multiple
:是否批量确认。

requeue
:值为
true
消息将重新入队列。

3.3.3 basicReject

basicNack
:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

/**
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @see com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag
:表示消息投递序号。

requeue
:值为
true
消息将重新入队列。

3.4 启动测试

@Test
public void sentMsg() throws IOException {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend("","helloRabbitMQ","Hello RabbitMQ111 ~ ", correlationId);
// 为了使进程阻塞
System.in.read();
}

在这里我们执行

sentMsg()
方法,输出日志如下:

从日志信息中我们可以看出,消息已成功被消费,并且当第一次消费失败后消息被重新放回了队列,并进行了再此消费,当再次失败后则放弃该条消息。

2021-09-12 00:47:03.451  INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=eb06a986-0e51-464a-8b8c-d2a8271c0008]
2021-09-12 00:47:03.452  INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
2021-09-12 00:47:04.142  INFO 66160 --- [ntContainer#3-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@75181b50: tags=[[amq.ctag-C1o5ZRm1g0fxX-Q53CCZcw]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,4), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://admin@127.0.0.1:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:04.157  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:04.157  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:04.158  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~
2021-09-12 00:47:04.158  INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : 消息即将再次返回队列处理...
2021-09-12 00:47:04.162 ERROR 66160 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2021-09-12 00:47:05.163  INFO 66160 --- [ntContainer#3-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@37695b29: tags=[[amq.ctag-GMJHJuVr22w1so4vhSp-dQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,8), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://admin@127.0.0.1:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~
2021-09-12 00:47:05.186  INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : 消息已重复处理失败,拒绝再次接收...

3.5 小节

消费方的ACK机制可以有效的解决消息从

Broker
Consumer
丢失的问题。但也要注意一点:消息的无限消费。

3.6 消息无限消费

如果消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息,

int a = 1 / 0
发生异常后将消息重新投入队列。

@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {

try {
log.info("消费者 2 号收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}

但是有个问题是,业务代码一旦出现

bug
99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环,CPU被瞬间打满了,而且
rabbitmq management
只有一条未被确认的消息。

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行,那该怎么处理呢?

**第一种方法:**是根据异常类型来选择是否重新放入队列。

第二种方法: 先将消息进行应答,此时消息队列会删除该条消息,然后通过**channel.basicPublish()**重新发布这个消息,异常消息就放在了消息队列尾部,,进而不会影响已经进入队列的消息处理。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));

但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入

MySQL
并推送报警,进行人工处理和定时任务做补偿。

4. 总结

4.1 持久化

  1. Exchange 要持久化 通过
    durable
    属性控制,true:持久化, 缺省:true。
  2. queue 要持久化 通过
    durable
    属性控制,true:持久化, 缺省:true。
  3. message 要持久化

在springboot环境下,message模式也是持久化。

4.2 生产方确认Confirm

4.3 消费方确认Ack

4.4 Broker 高可用

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: