RabbitMQ-如何保证消息在99.99%的情况下不丢失
1. 简介
MQ虽然帮我们解决了很多问题,但是也带来了很多问题,其中最麻烦的就是:如何保证消息的可靠性传输。
我们在聊如何保证消息的可靠性传输之前,先考虑下哪些情况下会出现消息丢失的情况。
首先,上图中完整的展示了消息从生产到被消费的完整链路,我们通过图列举下各种情况。
Producer
在把Message
发送到Broker
的过程中,因为网络不可靠的原因,可能会出现Message
还未发送到Broker
就丢失,或者Message
发送到了Broker
,但是由于某种原因,消息未保存到Broker。Broker
接收到Message
数据存储在内存,Consumer
还没消费,Broker
宕机了。Consumer
接收到了Message
,Message
相关业务还没来得及处理,程序报错或者宕机了,Broker
会认为Consunmer
消息正常消费了,就把当前消息从队列中移除了。这种情况也算是消息丢失。
从上述的问题中我们可以总结出想要消息被正常消费,就得保证:
- 消息成功被
Broker
接收到。 - 消息可以被
Broker
持久化。 - 消息成功被
Consumer
接收并且当消费失败时,消息可以重回队列。 - 要有相应的补偿机制。(当任何一个环节出错时,可以进行消息 补偿)。
2. 消息的可靠投递
我们在使用MQ的时候,为了避免消息丢失或者投递失败。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性。
- confirm 确认模式
- return 退回模式
如图所示:
消息从 producer 到 exchange 则会返回一个confirmCallback 。 消息从 exchange 到 queue 投递失败则会返回一个 ReturnsCallback 信息,其内容为ReturnedMessage实例信息。 我们将利用这两个 callback 控制消息的可靠性投递。
2.1 confirm
2.1.1 引入所需依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
2.1.2 application.yaml
spring: rabbitmq: host: localhost port: 5672 # rabbit 默认的虚拟主机 virtual-host: / # rabbit 用户名密码 username: admin password: admin123 # 开启消息发送确认功能 publisher-confirm-type: correlated # 高版本已弃用 # publisher-confirms: true
2.1.3 ConfirmCallBack
package com.ldx.rabbitmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 生产者消息确认回调方法 * * @author ludangxin * @date 2021/9/11 */ @Slf4j public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { /** * * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("MsgSendConfirmCallBack , 回调id: {}", correlationData); if(ack) { log.info("消息发送成功"); }else { log.info("消息发送失败: {}", cause); } } }
2.1.3 RabbitConfig
package com.ldx.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig implements InitializingBean { @Autowired private RabbitTemplate rabbitTemplate; /** * 设置一个简单的队列 */ @Bean(name = "durableQueue") public Queue queue() { /* * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 */ return new Queue("helloRabbitMQ", true, false, false, null); } /** * bean 初始化后执行 */ @Override public void afterPropertiesSet() { // 设置消息确认回调类 rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack()); } }
2.1.4 测试方法
这里两个测试方法,
sentMsg()使用默认的
Exchange,而
sentMsg2()设置一个不存在的
Exchange测试失败情况。
package com.ldx.rabbitmq; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.UUID; @SpringBootTest public class ProducerTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void sentMsg(){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend("", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId); } @Test public void sentMsg2(){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); // 设置一个不存在的exchange 测试失败情况 rabbitTemplate.convertAndSend("abc", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId); } }
2.1.5 启动测试
sendMsg()方法日志如下:
2021-09-11 21:30:38.336 INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=8e9fc4b8-aa32-4e1b-a165-8a83457636ed] 2021-09-11 21:30:38.339 INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
sendMsg2()方法日志如下:
2021-09-11 21:32:27.377 INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=399c8d85-f010-433f-946c-419d9b9396c2] 2021-09-11 21:32:27.379 INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)
2.1.6 小结
Confirm 确认模式
是从Producer
到Exchange
。Producer
发送的消息正常或失败时都会进入Confirm Callback
方法。Producer
发送消息的Exchange
不存在时,Confirm Callback
中的Ack
为false且Cause
为发送失败原因。
2.2 return
2.2.1 application.yaml
spring: rabbitmq: host: localhost port: 5672 # rabbit 默认的虚拟主机 virtual-host: / # rabbit 用户名密码 username: admin password: admin123 # 开启消息发送确认功能 publisher-confirm-type: correlated # 高版本已弃用 # publisher-confirms: true# 开启失败退回功能 publisher-returns: true
2.2.2 ReturnCallback
这里注意下,网上很多提到的
ReturnCallback(少了个s)接口已经弃用,注释中也提到了,弃用是为了更好的使用
ReturnedMessage类,因为对象的方式可以更好的支持
lambda表达式。
package com.ldx.rabbitmq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 发生异常时的消息返回提醒 * * @author ludangxin * @date 2021/9/11 */ @Slf4j public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback { /** * Returned message callback. * * @param returned the returned message and metadata. */ @Override public void returnedMessage(ReturnedMessage returned) { log.info("消息主体: {}", returned.getMessage()); log.info("回复编码: {}", returned.getReplyCode()); log.info("回复内容: {}", returned.getReplyText()); log.info("交换器: {}", returned.getExchange()); log.info("路由键: {}", returned.getRoutingKey()); } }
2.2.3 RabbitConfig
将
RabbitReturnCallback设置到
RabbitTemplate中。
/** * bean 初始化后执行 */ @Override public void afterPropertiesSet() { // 设置消息确认回调类 rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack()); // 设置消息回退回调类 rabbitTemplate.setReturnsCallback(new RabbitReturnCallback()); }
2.2.4 测试方法
@Test public void sentMsg3(){ String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); // 设置一个不存在的routingkey 测试失败情况 rabbitTemplate.convertAndSend("", "helloRabbitMQ1", "Hello RabbitMQ ~ ", correlationId); }
2.2.5 启动测试
# sentMsg() 2021-09-11 22:12:24.079 INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=fb471c69-6c7b-48bc-89aa-ae70ac1ed6f8] 2021-09-11 22:12:24.081 INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功 # sentMsg2() 2021-09-11 22:13:42.910 INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=0e3211ee-a1ba-45e4-90f6-296be79def07] 2021-09-11 22:13:42.912 INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40) # sentMsg3() 2021-09-11 22:14:23.600 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 消息主体: (Body:'Hello RabbitMQ ~ ' MessageProperties [headers={spring_returned_message_correlation=0a8db922-ff7c-4b13-86a3-04957a7359bc}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2021-09-11 22:14:23.602 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回复编码: 312 2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回复内容: NO_ROUTE 2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 交换器: 2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 路由键: helloRabbitMQ1 2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=0a8db922-ff7c-4b13-86a3-04957a7359bc] 2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
2.2.6 小节
Return 退回模式
是从Exchange
到Queue
。Return
给了Producer
。Producer
发送的消息即使Routing Key
不正确,当Exchange
接收失败后直接触发Confirm Callback
,不会进入到Return Callback
,因为还没到Exchange
。- 当
Exchange
正确接收消息,但是Routing Key
设置错误, 触发Return Callback
方法。
3. 消息的可靠消费
上文中我们提到了一种消息丢失的情况,即
Consumer接收到了
Message,
Message相关业务还没来得及处理,程序报错或者宕机了,
Broker会认为
Consunmer消息正常消费了,就把当前消息从队列中移除了。这种情况也算是消息丢失。
那能不能消息消费成功后再将消息从queue中移除呢?
答案肯定是可以的。
3.1 ACK确认机制
ACK指Acknowledge,确认。 表示消费端收到消息后的确认方式。
- 作用:
- 确认消息是否被消费者消费,消息通过ACK机制确认是否被正确接收,每个消息都要被确认。
- 默认情况下,一个消息被消费者正确消费就会从队列中移除
- ACK确认模式
- AcknowledgeMode.NONE :不确认
-
默认所有消息消费成功,会不断的向消费者推送消息。
- 因为RabbitMQ认为所有推送的消息已被成功消费,所以推送出去的消息不会暂存在
broker
,消息存在丢失的危险。 - AcknowledgeMode.AUTO:自动确认
-
由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到
broker
。 - 使用自动确认模式时,需要考虑的另一件事是消费者过载,因为
broker
会暂存没有收到ack
的消息,等消费端ack
后才会丢掉;如果收到消费端的nack
(消费失败的标识)或connection
断开没收到反馈,会将消息放回到原队列头部,导致消费者反复的在消费这条消息。 - AcknowledgeMode.MANUAL:手动确认
-
手动确认则当消费者调用
ack
、nack
、reject
几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者。 - 手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量。也就是
Consumer
一次可以从Broker
取几条消息。 - 如果忘记进行ACK确认 忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。只要程序还在运行,没确认的消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。更厉害的是,RabbitMQ 消息消费并没有超时机制,也就是说,程序不重启,消息就永远是 Unacked 状态。处理运维事件时不要忘了这些 Unacked 状态的消息。当程序关闭时(实际只要 消费者 关闭就行),消息会恢复为 Ready 状态。
3.2 配置application.yaml
spring: rabbitmq: host: localhost port: 5672 # rabbit 默认的虚拟主机 virtual-host: / # rabbit 用户名密码 username: admin password: admin123 listener: simple: # manual 手动确认 acknowledge-mode: manual
3.3 Consumer
package com.ldx.rabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消费者 * * @author ludangxin * @date 2021/9/12 */ @Slf4j @Component public class RabbitMQListener { @RabbitListener(queues = "helloRabbitMQ") public void helloRabbitMq(Message message, Channel channel) throws IOException { MessageProperties messageProperties = message.getMessageProperties(); log.info(messageProperties.toString()); try { log.info(message.toString()); log.info(new String(message.getBody())); int a = 1/0; channel.basicAck(messageProperties.getDeliveryTag(), false); } catch (Exception e) { // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息 if (messageProperties.getRedelivered()) { log.info("消息已重复处理失败,拒绝再次接收..."); // 拒绝消息 channel.basicReject(messageProperties.getDeliveryTag(), false); } else { log.info("消息即将再次返回队列处理..."); channel.basicNack(messageProperties.getDeliveryTag(), false, true); } } } }
消费消息有三种回执方法,接下来先看下每个方法参数的含义。
3.3.1 basicAck
/** * Acknowledge one or several received * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being acknowledged. * @see com.rabbitmq.client.AMQP.Basic.Ack * @param deliveryTag the tag from the received * @param multiple true to acknowledge all messages up to and * including the supplied delivery tag; false to acknowledge just * the supplied delivery tag. * @throws java.io.IOException if an error is encountered */ void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:消息投递的标签号,每次消费消息或者消息重新投递后,
deliveryTag都会增加。手动消息确认模式下,我们可以对指定
deliveryTag的消息进行
ack、
nack、
reject等操作。
multiple:是否批量确认,值为
true则会一次性
ack所有小于当前消息
deliveryTag的消息。
举个栗子: 假设我先发送三条消息
deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时
deliveryTag为8,
multiple设置为 true,会将5、6、7、8的消息全部进行确认。
3.3.2 basicNack
/** * Reject one or several received messages. * * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected. * @see com.rabbitmq.client.AMQP.Basic.Nack * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param multiple true to reject all messages up to and including * the supplied delivery tag; false to reject just the supplied * delivery tag. * @param requeue true if the rejected message(s) should be requeued rather * than discarded/dead-lettered * @throws java.io.IOException if an error is encountered */ void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag:表示消息投递序号。
multiple:是否批量确认。
requeue:值为
true消息将重新入队列。
3.3.3 basicReject
basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
/** * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being rejected. * @see com.rabbitmq.client.AMQP.Basic.Reject * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered * @throws java.io.IOException if an error is encountered */ void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:表示消息投递序号。
requeue:值为
true消息将重新入队列。
3.4 启动测试
@Test public void sentMsg() throws IOException { String uuid = UUID.randomUUID().toString(); CorrelationData correlationId = new CorrelationData(uuid); rabbitTemplate.convertAndSend("","helloRabbitMQ","Hello RabbitMQ111 ~ ", correlationId); // 为了使进程阻塞 System.in.read(); }
在这里我们执行
sentMsg()方法,输出日志如下:
从日志信息中我们可以看出,消息已成功被消费,并且当第一次消费失败后消息被重新放回了队列,并进行了再此消费,当再次失败后则放弃该条消息。
2021-09-12 00:47:03.451 INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=eb06a986-0e51-464a-8b8c-d2a8271c0008] 2021-09-12 00:47:03.452 INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功 2021-09-12 00:47:04.142 INFO 66160 --- [ntContainer#3-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@75181b50: tags=[[amq.ctag-C1o5ZRm1g0fxX-Q53CCZcw]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,4), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://admin@127.0.0.1:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0 2021-09-12 00:47:04.157 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ] 2021-09-12 00:47:04.157 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ]) 2021-09-12 00:47:04.158 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~ 2021-09-12 00:47:04.158 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : 消息即将再次返回队列处理... 2021-09-12 00:47:04.162 ERROR 66160 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80) 2021-09-12 00:47:05.163 INFO 66160 --- [ntContainer#3-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@37695b29: tags=[[amq.ctag-GMJHJuVr22w1so4vhSp-dQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,8), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://admin@127.0.0.1:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0 2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ] 2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ]) 2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~ 2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : 消息已重复处理失败,拒绝再次接收...
3.5 小节
消费方的ACK机制可以有效的解决消息从
Broker到
Consumer丢失的问题。但也要注意一点:消息的无限消费。
3.6 消息无限消费
如果消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息,
int a = 1 / 0发生异常后将消息重新投入队列。
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消费者 2 号收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
但是有个问题是,业务代码一旦出现
bug99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环,CPU被瞬间打满了,而且
rabbitmq management只有一条未被确认的消息。
经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。
消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行,那该怎么处理呢?
**第一种方法:**是根据异常类型来选择是否重新放入队列。
第二种方法: 先将消息进行应答,此时消息队列会删除该条消息,然后通过**channel.basicPublish()**重新发布这个消息,异常消息就放在了消息队列尾部,,进而不会影响已经进入队列的消息处理。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 重新发送消息到队尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(msg));
但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入
MySQL并推送报警,进行人工处理和定时任务做补偿。
4. 总结
4.1 持久化
- Exchange 要持久化 通过
durable
属性控制,true:持久化, 缺省:true。 - queue 要持久化 通过
durable
属性控制,true:持久化, 缺省:true。 - message 要持久化
在springboot环境下,message模式也是持久化。
4.2 生产方确认Confirm
4.3 消费方确认Ack
4.4 Broker 高可用
- RabbitMQ使用教程(四)如何通过持久化保证消息99.99%不丢失?
- RabbitMQ如何通过持久化保证消息99.99%不丢失?
- RabbitMQ如何通过持久化保证消息99.99%不丢失?
- RabbitMQ使用教程(三)如何保证消息99.99%被发送成功?
- RabbitMQ如何保证消息99.99%被发送成功?
- Rabbitmq在多实例的情况下,如何保证只有一个实例接受到消息
- RabbitMQ如何保证消息99.99%被发送成功?
- RabbitMQ如何保证队列里的消息99.99%被消费?
- RabbitMQ使用教程(五)如何保证队列里的消息99.99%被消费?
- 如何保证RabbitMQ的消息不丢失及其背后的原理
- 在高并发的情况下如何保证消息的可靠性?消息丢失如何解决?
- RabbitMQ如何保证队列里的消息99.99%被消费?
- RabbitMQ如何保证消息99.99%被发送成功?
- RabbitMQ如何保证队列里的消息99.99%被消费?
- RabbitMQ 如何保证消息不丢失?
- Twitter Storm如何保证消息不丢失
- RabbitMQ如何保证发送端消息的可靠投递
- Storm入门(五)Twitter Storm如何保证消息不丢失
- Storm如何保证消息不丢失 (Guaranteeing-message-processing)
- 《RabbitMQ》如何保证消息不被重复消费