RocketMQ(5)---RocketMQ重试机制
2019-07-02 00:03
766 查看
RocketMQ重试机制
消息重试分为两种:Producer发送消息的重试 和 Consumer消息消费的重试。
一、Producer端重试
Producer端重试是指: Producer往MQ上发消息没有发送成功,比如网络原因导致生产者发送消息到MQ失败。
看一下代码:
@Slf4j public class RocketMQTest { /** * 生产者组 */ private static String PRODUCE_RGROUP = "test_producer"; public static void main(String[] args) throws Exception { //1、创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_RGROUP); //设置重试次数(默认2次) producer.setRetryTimesWhenSendFailed(3000); //绑定name server producer.setNamesrvAddr("74.49.203.55:9876"); producer.start(); //创建消息 Message message = new Message("topic_family", ("小小今年3岁" ).getBytes()); //发送 这里填写超时时间是5毫秒 所以每次都会发送失败 SendResult sendResult = producer.send(message,5); log.info("输出生产者信息={}",sendResult); } }
超时重试针对网上说的超时异常会重试的说法都是错误的,想想都觉得可怕,我查的所以文章都说超时异常都会重试,难道这么多人都没有去测试一下 或者去看个源码。
我发现这个问题,是因为我上面超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但我设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报
显然不应该是一个级别。但测试发现无论我设置的多少次的重试次数,报异常的时间都差不多。
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
针对这个疑惑,我去看了源码之后,才恍然大悟。
/** * 说明 抽取部分代码 */ private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) { //1、获取当前时间 long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev ; //2、去服务器看下有没有主题消息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; //3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; //4、根据设置的重试次数,循环再去获取服务器主题消息 for (times = 0; times < timesTotal; times++) { MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; //5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的 if (timeout < costTime) { callTimeout = true; break; } //6、如果超时直接报错 if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } } }
通过这段源码很明显可以看出以下几点
- 如果是
异步发送
那么重试次数只有1次 - 对于同步而言,
超时异常也是不会再去重试
。 - 如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。
真是实践出真知!!!
二、 Consumer端重试
消费端比较有意思,而且在实际开发过程中,我们也更应该考虑的是消费端的重试。
消费者端的失败主要分为2种情况,
Exception和
Timeout。
1、Exception
@Slf4j @Component public class Consumer { /** * 消费者实体对象 */ private DefaultMQPushConsumer consumer; /** * 消费者组 */ public static final String CONSUMER_GROUP = "test_consumer"; /** * 通过构造函数 实例化对象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877"); //订阅topic和 tags( * 代表所有标签)下信息 consumer.subscribe("topic_family", "*"); //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { //1、获取消息 Message msg = msgs.get(0); try { //2、消费者获取消息 String body = new String(msg.getBody(), "utf-8"); //3、获取重试次数 int count = ((MessageExt) msg).getReconsumeTimes(); log.info("当前消费重试次数为 = {}", count); //4、这里设置重试大于3次 那么通过保存数据库 人工来兜底 if (count >= 2) { log.info("该消息已经重试3次,保存数据库。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //直接抛出异常 throw new Exception("=======这里出错了============"); //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); //启动监听 consumer.start(); } }
这里的代码意思很明显: 主动抛出一个异常,然后如果超过3次,那么就不继续重试下去,而是将该条记录保存到数据库由人工来兜底。
看下运行结果
注意消费者和生产者的重试还是有区别的,主要有两点
1、默认重试次数:Product默认是2次,而Consumer默认是16次。
2、重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照
1S,5S,10S,30S,1M,2M····2H进行重试。
2、Timeout
说明这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,即没有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS或
return ConsumeConcurrentlyStatus.RECONSUME_LATER。
那么 RocketMQ会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。
做这个测试很简单。
//1、消费者获得消息 String body = new String(msg.getBody(), "utf-8"); //2、获取重试次数 int count = ((MessageExt) msg).getReconsumeTimes(); log.info("当前消费重试次数为 = {}", count); //3、这里睡眠60秒 Thread.sleep(60000); log.info("休眠60秒 看还能不能走到这里。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body); //返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
当获得 当前消费重试次数为 = 0 后 , 关掉该进程。再重新启动该进程,那么依然能够获取该条消息
consumer消费者 当前消费重试次数为 = 0 休眠60秒 看还能不能走到这里。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3岁
只要自己变优秀了,其他的事情才会跟着好起来(上将2)
相关文章推荐
- 源码分析RocketMQ之消息消费重试机制
- RocketMQ 主从同步读写分离机制
- 【RocketMQ原理解析1.1】整体介绍&IDE编译并启动RocketMQ的第一个例子
- RocketMQ-消费重试机制
- 源码分析RocketMQ之消息ACK机制(消费进度)
- rocketmq 异常 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <rocketmq-n
- 源码分析RocketMQ之CommitLog消息存储机制
- RocketMQ 菜鸟笔记 (二) RocketMQ 4.1.0 安装与入门实例
- 消息中间件 RocketMQ源码解析:定时消息与消息重试
- 源码分析RocketMQ之消费队列、Index索引文件存储结构与存储机制-上篇
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- RocketMq学习一:RocketMq的安装
- 【RocketMQ源码深度解析】整体介绍&IDE编译并启动RocketMQ的第一个例子
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- RocketMQ API使用简介、拉取机制
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- RocketMQ(四)——消息重试
- 源码分析RocketMQ文件清除机制
- rocketmq学习笔记 五 源码之rocketmq-tools
- Rocket重试机制,消息模式,刷盘方式