集群与负载均衡系列(6)——消息队列之rabbitMQ+spring-boot+spring amqp发送可靠的消息
2017-05-22 10:24
936 查看
在开始文章之前,可以看一下RabbitMq的总结《rabbitMq的总结》,用的wps的“写得”总结的。后续的很多内容都会涉及到这些基础概念。
上一篇文中很简单的介绍了rabbitMQ与spring的整合,实际上还存在许多问题,因为消息发送是不可靠的。这篇文章将实现可靠消息的发送。
不要问我为什么字体为什么时大时小,CSDN的博客就是这么任性,我已经尽力了。
以下是文章的大纲
spring amqp api 简介
可靠消息的整体思路
通过确认机制、mandatory、缓存确保生产端消息的可靠性
生产端过滤攻击
通过死信确保消费端的消息可靠性,并弥补老版本的immediate
通过errorhandle防止致命错误
spring amqp则扩充了一些列类,而且优雅地提供了一些不错的机制。
建立连接
一样的,提供了一个连接工厂org.springframework.amqp.rabbit.connection.CachingConnectionFactory
不同的是:1、mandatory的指定
rabbitmq的客户端是在basicPublish指令中指定mandatory
spring amqp则是connectionFactory.setPublisherReturns
2、确认机制开启
rabbitmq的客户端是用confirmSelect指令
spring amqp则是connectionFactory.setPublisherConfirms
类的分解
spring amqp分出了不同的类来对应amqp协议的概念,而不是只有一个channel。
分出来的类有三种类型的exchange、queue、binding
操作类
和数据库一样,spring为操作封装了一个模板RabbitTemplate。其中包含了publish、get等命令的实现
消息对象
rabbitmq只允许发送byte数组,但是spring amqp在此基础上进行了扩展,可以发送任何的对象。
消息可以用Message封装交付因子。MessageProperties设置消息头。
消息和对象可以通过一系列的Conventer自动转换。
消费监听
spring amqp提供了使用@RabbitListener进行优雅的订阅消息。其主要基于SimpleRabbitListenerContainerFactory配置。另外,还有一种方式是通过SimpleMessageListenerContainer。
errorHandle
errorHandle机制是这样的,针对一些致命错误,比如使消费端瘫痪的错误,使用错误机制将使其自动nack,并且不论设置是否可重入队列,都将把消息丢弃。这样做的好处就是防止轮询到其他消费端,是大片消费端瘫痪。
不要问我中间的缓存处理为什么这么绕,要问就问rabbitmq自生的确认机制和mandatory的回调顺序为什么这么奇葩
生产端造成消息丢失点总共有
开启确认机制:spring-amqp:connectionFactory.setPublisherConfirms(rabbitmqProps.isPublisherConfirms());
客户端:channel.confirmSelect();
回调的设置:
设置方法:spring-amqp:rabbitTemplate.setMandatory(true);
客户端:channel.basicPublish的mandatory参数
回调的设置:
重发可以使用quartz、spring4的scheduler、甚至spring retry
持久化的设置:spring-amqp:Queue构造函数其中的durable参数
客户端:builder.deliveryMode(2);其中builder为AMQP.BasicProperties.Builder
重发的代码将在下一章节给出
a、如果是bug,那么该消息永远不可能处理成功,难道就任由它无限重发
b、如果是致命错误,会导致消费端瘫痪,那么重入队列,负载均衡到其它消费端后、造成其它消费端瘫痪
c、如果是队列已满,如何获知该原因并通知管理员
d、如果对不同错误需要不同的处理
e、如何提供一个统一的地方处理以上问题,而不是分散到每个消费端代码中
f、弥补immediate,由于有了死信,就不需要在没有消费端的时候回调了,因为消息会过期进入死信
通过死信,提供了处理以上问题的地方。对于b、d两点,spring amqp还提供了一个errorHandle来进行处理
在queue的arguments中设置,arguments.put("x-dead-letter-exchange", "dlxExchange");还可以设置一个死信路由键x-dead-letter-routing-key
设置queue的arguments,spring amqp和客户端的设置基本类似,无非是在生成队列时,一个是调用Queue的构造器传入arguments,一个是
channel.declareQueue传入arguments。
b、设置TTL
ttl存在两种:1、每个队列中消息的TTL 在queue的arguments中设置arguments.put("x-message-ttl",
6000);
2、每个消息的TTL,在消息头中设置
spring amqp:Message expirationMsg=new Message(message.getBytes(),
MessagePropertiesBuilder.newInstance()
.setExpiration(originalExpiration)
.build()
);
rabbitTemplate.send(exchange, key, expirationMsg, correlationId);
客户端:AMQP.BasicProperties properties = new
AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
两种ttl的区别在于,判断是否过期的时候,在queue上设置的ttl只需要判断queue头信息即可,在message上设置的ttl需要判断每个消息头。
c、设置死信队列
设置一个默认的队列就可以
d、绑定
死信交换机和死信队列进行绑定,路由键位设置的x-dead-letter-routing-key,或者原队列的路由键
需要注意的是,死信队列上不会有DLX标识,而是在原队列上标注
死信交换机上也不会有DLX标识
先来看一下如何设置auto ack吧
spring amqp:simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
客户端:channel.basicConsumer中的参数
机制如下:
a、消费端出现异常将自动nack
b、simpleRabbitListenerContainerFactory.setDefaultRequeueRejected设置默认是否重入队列
该设置使得nack时是否重入队列,但是存在以下例外
Ⅰ、errorHandle中拦截的异常不会重入队列
Ⅱ、抛出AmqpRejectAndDontRequeueExceptiond异常不会重入队列
整个死信的消费端代码
spring amqp在ConditionalRejectingErrorHandler.DefaultExceptionStrategy类中默认封装了一系列异常,在新版本中,自己的异常只需要继承ConditionalRejectingErrorHandler.DefaultExceptionStrategy类,重载isFatal方法即可。老版本该类是私有的,只能复制代码。我这里做测试,引入了NullPonterException
上一篇文中很简单的介绍了rabbitMQ与spring的整合,实际上还存在许多问题,因为消息发送是不可靠的。这篇文章将实现可靠消息的发送。
不要问我为什么字体为什么时大时小,CSDN的博客就是这么任性,我已经尽力了。
以下是文章的大纲
spring amqp api 简介
可靠消息的整体思路
通过确认机制、mandatory、缓存确保生产端消息的可靠性
生产端过滤攻击
通过死信确保消费端的消息可靠性,并弥补老版本的immediate
通过errorhandle防止致命错误
¤spring amqp api 简介
rabbitmq的客户端所提供的api很简洁,ConnectionFactory->Connection->Channel,然后就可以通过channel调用各种命令了。spring amqp则扩充了一些列类,而且优雅地提供了一些不错的机制。
一样的,提供了一个连接工厂org.springframework.amqp.rabbit.connection.CachingConnectionFactory
不同的是:1、mandatory的指定
rabbitmq的客户端是在basicPublish指令中指定mandatory
spring amqp则是connectionFactory.setPublisherReturns
2、确认机制开启
rabbitmq的客户端是用confirmSelect指令
spring amqp则是connectionFactory.setPublisherConfirms
类的分解
spring amqp分出了不同的类来对应amqp协议的概念,而不是只有一个channel。
分出来的类有三种类型的exchange、queue、binding
操作类
和数据库一样,spring为操作封装了一个模板RabbitTemplate。其中包含了publish、get等命令的实现
消息对象
rabbitmq只允许发送byte数组,但是spring amqp在此基础上进行了扩展,可以发送任何的对象。
消息可以用Message封装交付因子。MessageProperties设置消息头。
消息和对象可以通过一系列的Conventer自动转换。
消费监听
spring amqp提供了使用@RabbitListener进行优雅的订阅消息。其主要基于SimpleRabbitListenerContainerFactory配置。另外,还有一种方式是通过SimpleMessageListenerContainer。
errorHandle
errorHandle机制是这样的,针对一些致命错误,比如使消费端瘫痪的错误,使用错误机制将使其自动nack,并且不论设置是否可重入队列,都将把消息丢弃。这样做的好处就是防止轮询到其他消费端,是大片消费端瘫痪。
¤可靠消息的整体思路
直接上图吧不要问我中间的缓存处理为什么这么绕,要问就问rabbitmq自生的确认机制和mandatory的回调顺序为什么这么奇葩
¤通过确认机制、mandatory、缓存确保生产端消息的可靠性
基本概念这里都不总结了,可以看这里《rabbitMq的总结》生产端造成消息丢失点总共有
1、找不到交换机
确认机制会在这个时候起效果。然后相应地处理缓存即可开启确认机制:spring-amqp:connectionFactory.setPublisherConfirms(rabbitmqProps.isPublisherConfirms());
客户端:channel.confirmSelect();
回调的设置:
//确认机制,监听无法到达交换机时的回调 template.setConfirmCallback((correlationData,ack,cause)->{ logger.debug("confirm回调!"); if (ack) { logger.debug(correlationData+"消息到达交换机!"); //从备份缓存中移除 if(correlationData instanceof CacheCorrelationData){ CacheCorrelationData cacheCorrelationData=(CacheCorrelationData) correlationData; String cacheName=cacheCorrelationData.getCacheName(); String cacheKey=cacheCorrelationData.getId(); Map<String,Object> copyCache=MessageCacheManager.instance().get("copy"); if(copyCache!=null){ MetaMessage metaMessage=(MetaMessage) copyCache.get(cacheKey); //没有找到任何队列 if(metaMessage!=null){ logger.debug("加入重发缓存!"); Optional.ofNullable(MessageCacheManager.instance().get("copy")).ifPresent(map->{ Optional.ofNullable(metaMessage).ifPresent(message->{ MessageCacheUtil.add(cacheName, correlationData.getId(), message); }); MessageCacheUtil.remove("copy", cacheKey); }); logger.debug("清除备份缓存!"); MessageCacheUtil.remove("copy", cacheKey); } //找到队列 else{ logger.debug("清除重发缓存!"); MessageCacheUtil.remove(cacheName, cacheKey); logger.debug("清除备份缓存!"); MessageCacheUtil.remove("copy", cacheKey); } } //找到队列 else{ logger.debug("清除重发缓存!"); MessageCacheUtil.remove(cacheName, cacheKey); } } else{ logger.info(correlationData+"没有使用缓存,消息将被丢弃,不会尝试重发!"); } } else { logger.info(correlationData+"消息没有找到交换机!" + cause); //清除备份缓存 if(correlationData instanceof CacheCorrelationData){ logger.debug("清除备份缓存!"); CacheCorrelationData cacheCorrelationData=(CacheCorrelationData) correlationData; String cacheName=cacheCorrelationData.getCacheName(); String cacheKey=cacheCorrelationData.getId(); //从copy缓存中获得return时备份的消息 Optional.ofNullable(MessageCacheManager.instance().get("copy")).ifPresent(map->{ MetaMessage metaMessage=(MetaMessage) map.get(cacheKey); Optional.ofNullable(metaMessage).ifPresent(message->{ MessageCacheUtil.add(cacheName, correlationData.getId(), message); }); MessageCacheUtil.remove("copy", cacheKey); }); } else{ logger.info(correlationData+"没有使用缓存,消息将被丢弃,不会尝试重发!"); } } });
2、不可路由
在交换机上 找不到任何路由键匹配的绑定队列的时候,设置mandatory为true,可以得到回掉。设置方法:spring-amqp:rabbitTemplate.setMandatory(true);
客户端:channel.basicPublish的mandatory参数
回调的设置:
//不可路由时回调mandatory template.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{ logger.debug("return回调!"); logger.info("没有找到任何匹配的队列!"+ "message:"+message+ ",replyCode:"+replyCode+ ",replyText:"+replyText+ ",exchange:"+exchange+ ",routingKey:"+routingKey); //从缓存中移除 logger.debug("清除重发缓存!"); String messageJsonstr=new String(message.getBody()); CacheMessage cacheMessage=JSON.parseObject(messageJsonstr,CacheMessage.class); String cacheName=cacheMessage.getCacheCorrelationData().getCacheName(); String cacheKey=cacheMessage.getCacheCorrelationData().getId(); MetaMessage metaMessage=(MetaMessage) MessageCacheManager.instance().get(cacheName).get(cacheKey); MessageCacheUtil.remove(cacheName,cacheKey); //由于amqp奇葩的协议规定,return比confirm先回调,所以放入一个备份缓存,以备confirm中还能找到该消息 logger.debug("加入备份缓存!"); MessageCacheUtil.add("copy", cacheMessage.getCacheCorrelationData().getId(), metaMessage); });
3、持久化之前服务挂了
每隔一段时间从重发缓存中取出消息重发重发可以使用quartz、spring4的scheduler、甚至spring retry
持久化的设置:spring-amqp:Queue构造函数其中的durable参数
客户端:builder.deliveryMode(2);其中builder为AMQP.BasicProperties.Builder
重发的代码将在下一章节给出
¤生产端过滤攻击
这里还有一个小问题,万一有人故意试探或者攻击服务器,这个时候要能辨别出来并且直接丢弃。可以预先把所有合法的路由键存起来,重发的时候进行比较。import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePropertiesBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.wlf.demo.pojo.CacheMessage; import com.wlf.demo.pojo.MetaMessage; import com.wlf.demo.util.AttactMessageFilter; import com.wlf.demo.util.CacheCorrelationData; import com.wlf.demo.util.MessageCacheManager; import com.wlf.demo.util.MessageCacheUtil; /** * * 确认机制confirm返回nack后的信息在此统一重发 * * @author 吴林峰 * */ @Component public class MessageRetrySchedule { private static final Logger logger = LoggerFactory.getLogger(MessageRetrySchedule.class); @Autowired private RabbitTemplate rabbitTemplate; /** * * 重发 * */ @Scheduled(fixedRate = 10000) public void sendMessage() { System.out.println("go!"); Map<String,Map<String,Object>> caches=MessageCacheManager.instance().getAll(); caches.forEach((cacheName,cache)->{ cache.forEach((key,message)->{ MetaMessage metaMessage=(MetaMessage) message; if(AttactMessageFilter.instance().isAttact(metaMessage.getRoutingKey())){ logger.debug("不合法的路由键,可能是外部攻击!"+metaMessage.getRoutingKey()); MessageCacheUtil.remove(cacheName, key); return; } //重发尝试5次 if(metaMessage.getCount()==6){ logger.info("消息无法找到交换机,message:"+message+"exchange:"+metaMessage.getExchange()+",routingKeys:"+metaMessage.getRoutingKey()); logger.info("通知管理员并写入数据库(注意去重)"); MessageCacheUtil.remove(cacheName, key); return; } CacheCorrelationData correlationId = new CacheCorrelationData(metaMessage.getPayload().toString(),cacheName); logger.info("消息重发!exchange:"+metaMessage.getExchange()+",routingKeys:"+metaMessage.getRoutingKey()+",correlationData:"+correlationId.toString()); CacheMessage cacheMessage=new CacheMessage(); CacheCorrelationData correlationData = new CacheCorrelationData(key,cacheName); cacheMessage.setCacheCorrelationData(correlationData); cacheMessage.setPayload(metaMessage.getPayload()); Message msg=new Message(JSONObject.toJSONString(cacheMessage).getBytes(),MessagePropertiesBuilder.newInstance().setContentType("text/x-json").build()); rabbitTemplate.send(metaMessage.getExchange(), metaMessage.getRoutingKey(), msg, correlationId); metaMessage.setCount(metaMessage.getCount()+1); }); }); } }
¤通过死信确保消费端的消息可靠性,并弥补老版本的immediate
死信的基本概念也不简绍了,可以看这里《rabbitMq的总结》1、为什么引入死信
为什么引入死信,可能很多刚接触消息队列的人都会有这个疑惑吧。我谈谈我的看法。虽然有重新入队的机制,但是所有消费端未发送成功的消息都重入队真的就没什么问题?我先抛出几个问题。a、如果是bug,那么该消息永远不可能处理成功,难道就任由它无限重发
b、如果是致命错误,会导致消费端瘫痪,那么重入队列,负载均衡到其它消费端后、造成其它消费端瘫痪
c、如果是队列已满,如何获知该原因并通知管理员
d、如果对不同错误需要不同的处理
e、如何提供一个统一的地方处理以上问题,而不是分散到每个消费端代码中
f、弥补immediate,由于有了死信,就不需要在没有消费端的时候回调了,因为消息会过期进入死信
通过死信,提供了处理以上问题的地方。对于b、d两点,spring amqp还提供了一个errorHandle来进行处理
2、死信的设置
a、需要被重新路由到死信的队列上设置死信交换机在queue的arguments中设置,arguments.put("x-dead-letter-exchange", "dlxExchange");还可以设置一个死信路由键x-dead-letter-routing-key
设置queue的arguments,spring amqp和客户端的设置基本类似,无非是在生成队列时,一个是调用Queue的构造器传入arguments,一个是
channel.declareQueue传入arguments。
b、设置TTL
ttl存在两种:1、每个队列中消息的TTL 在queue的arguments中设置arguments.put("x-message-ttl",
6000);
2、每个消息的TTL,在消息头中设置
spring amqp:Message expirationMsg=new Message(message.getBytes(),
MessagePropertiesBuilder.newInstance()
.setExpiration(originalExpiration)
.build()
);
rabbitTemplate.send(exchange, key, expirationMsg, correlationId);
客户端:AMQP.BasicProperties properties = new
AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
两种ttl的区别在于,判断是否过期的时候,在queue上设置的ttl只需要判断queue头信息即可,在message上设置的ttl需要判断每个消息头。
c、设置死信队列
设置一个默认的队列就可以
d、绑定
死信交换机和死信队列进行绑定,路由键位设置的x-dead-letter-routing-key,或者原队列的路由键
需要注意的是,死信队列上不会有DLX标识,而是在原队列上标注
死信交换机上也不会有DLX标识
3、spring amqp的ack规则
除了可以手动ack以外,rabbitMq提供了auto ack机制,它会在收到消息后自动ack。但是spring amqp中则又些不同。先来看一下如何设置auto ack吧
spring amqp:simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
客户端:channel.basicConsumer中的参数
机制如下:
a、消费端出现异常将自动nack
b、simpleRabbitListenerContainerFactory.setDefaultRequeueRejected设置默认是否重入队列
该设置使得nack时是否重入队列,但是存在以下例外
Ⅰ、errorHandle中拦截的异常不会重入队列
Ⅱ、抛出AmqpRejectAndDontRequeueExceptiond异常不会重入队列
整个死信的消费端代码
import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentSkipListMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePropertiesBuilder; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import com.wlf.demo.pojo.CacheMessage; import com.wlf.demo.util.CacheCorrelationData; /** * * 死信统一分情况处理消费端无法消费的消息,另外也处理老版本immediate的情况,即没有消费者的情况 * * @author 吴林峰 * */ @Component @RabbitListener(queues = "dlxQueue") public class DeadRecv { private static final Logger logger = LoggerFactory.getLogger(DeadRecv.class); private Map<String,Integer> expiredCounter=new ConcurrentSkipListMap<String,Integer>(); private final String EXPIRED="expired"; private final String REJECTED="rejected"; private final String MAXLEN="maxlen"; @Autowired private RabbitTemplate rabbitTemplate; @RabbitHandler public void receive(@Header("x-death") List<Map<String,Object>> ds,String message, Channel channel) { try{ logger.info("死信!"+message); CacheMessage cacheMessage=JSONObject.parseObject(message,CacheMessage.class); Map<String,Object> params=ds.stream() .findFirst() .get(); String reason=(String) params.get("reason"); String queue=(String) params.get("queue"); String originalExpiration=(String) params.get("original-expiration"); String exchange=(String) params.get("exchange"); List<String> routingKeys=(List<String>) params.get("routing-keys"); String key=routingKeys.stream().findFirst().get(); logger.debug("reason:"+reason+ ",queue:"+queue+ ",originalExpiration:"+originalExpiration+ ",exchange:"+exchange+ ",routingKeys:"+key); CacheCorrelationData correlationId = new CacheCorrelationData(message,cacheMessage.getCacheCorrelationData().getCacheName()); Message msg=new Message(message.getBytes(), MessagePropertiesBuilder.newInstance() .setContentType("text/x-json") .build() ); if(reason.equals(EXPIRED)){ //五次重发的机制,不是一味的重发 int count=Optional.ofNullable(expiredCounter.get(cacheMessage.getCacheCorrelationData().getId())).orElse(0); logger.debug("重发详情:key="+cacheMessage.getCacheCorrelationData().getId()+",count="+count); if(count==6){ expiredCounter.remove(cacheMessage.getCacheCorrelationData().getId()); logger.debug("消息"+message+"无法被消费端处理,queue:"+queue+"exchange:"+exchange+"routingKeys:"+key); logger.debug("通知管理员并写入数据库(注意去重)"); return; } count++; expiredCounter.put(cacheMessage.getCacheCorrelationData().getId(), count); //消息重发 Optional.ofNullable(originalExpiration).ifPresent(e->{ Message expirationMsg=new Message(message.getBytes(), MessagePropertiesBuilder.newInstance() .setExpiration(originalExpiration) .setContentType("text/x-json") .build() ); rabbitTemplate.send(exchange, key, expirationMsg, correlationId); }); rabbitTemplate.send(exchange, key, msg, correlationId); } else if(reason.equals(REJECTED)){ //由于致命错误,为了防止负载均衡的轮询策略,路由到其他消费者,造成大量消费者瘫痪,直接丢弃该消息 logger.info("消息被拒绝!"+message); logger.info("通知管理员并写入数据库(注意去重)"); } else if(reason.equals(MAXLEN)){ logger.info("队列已满,消息转发!"+queue); //利用负载均衡的轮询策略,重发消息 rabbitTemplate.send(exchange, key, msg, correlationId); } }catch(Exception e){ logger.info("死信处理出错!"+message); logger.info("通知管理员并写入数据库(注意去重)"); e.printStackTrace(); throw new AmqpRejectAndDontRequeueException(e.getMessage()); } } }
¤通过errorhandle防止致命错误
最后一个问题,消费端遇到致命的错误。这个问题,spring amqp提供了一个比较不错的机制errorhandle。在消费端抛出异常后,会在FatalExceptionStrategy的实现类中判断是否是致命异常,调用isFatal方法。如果是致命异常,消息将不会重新入队。spring amqp在ConditionalRejectingErrorHandler.DefaultExceptionStrategy类中默认封装了一系列异常,在新版本中,自己的异常只需要继承ConditionalRejectingErrorHandler.DefaultExceptionStrategy类,重载isFatal方法即可。老版本该类是私有的,只能复制代码。我这里做测试,引入了NullPonterException
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; import org.springframework.messaging.handler.annotation.support.MethodArgumentTypeMismatchException; public class MessageFatalExceptionStrategy implements FatalExceptionStrategy { private static final Logger logger = LoggerFactory.getLogger(MessageFatalExceptionStrategy.class); @Override public boolean isFatal(Throwable t) { logger.debug("isFatal"); if (t instanceof ListenerExecutionFailedException && causeIsFatal(t.getCause())) { logger.debug("通知管理员程序存在bug,导致"+t.getMessage()+",使得消费端瘫痪"); return true; } return false; } public static boolean causeIsFatal(Throwable cause) { return cause instanceof MessageConversionException || cause instanceof org.springframework.messaging.converter.MessageConversionException || cause instanceof MethodArgumentNotValidException || cause instanceof MethodArgumentTypeMismatchException || cause instanceof NullPointerException; } }git地址:https://github.com/wulinfeng2/spring-boot-rabbitmq
相关文章推荐
- 集群与负载均衡系列(5)——消息队列之spring-boot整合Rabbitmq
- spring boot Rabbitmq集成,延时消息队列实现
- Spring Boot + RabbitMQ 实现消息队列场景
- spring boot Rabbitmq集成,延时消息队列实现
- SpringBoot对消息队列(MQ)的支持
- Spring AMQP + Rabbit 配置多数据源消息队列
- spring boot rabbitmq 多MQ配置 自动 创建 队列 RPC
- Spring Boot RabbitMQ 延迟消息实现完整版示例
- 消息队列系列(三):.Rabbitmq Trace的使用
- SpringBoot+ActiveMQ多消息队列监听
- 集群与负载均衡系列(4)——消息队列之Rabbitmq的搭建
- MQ系列3 使用Spring发送,消费topic和queue消息 activeMQ
- spring boot 发送kafka消息队列
- 4 微服务实战系列 - SpringBoot RabbitMQ 实战解决项目中实践
- SpringBoot对消息队列(MQ)的支持
- RabbitMQ+Spring Quartz 实现消息的定时发送和接收
- SpringBoot的RabbitMQ消息队列: 一、消息发送接收第一印象
- 集群与负载均衡系列(7)——消息队列之分布式事务
- 详细介绍Spring Boot + RabbitMQ实现延迟队列
- spring整合activemq发送MQ消息[queue模式]实例