您的位置:首页 > 编程语言 > Java开发

集群与负载均衡系列(6)——消息队列之rabbitMQ+spring-boot+spring amqp发送可靠的消息

2017-05-22 10:24 936 查看
          在开始文章之前,可以看一下RabbitMq的总结《rabbitMq的总结》,用的wps的“写得”总结的。后续的很多内容都会涉及到这些基础概念。

       上一篇文中很简单的介绍了rabbitMQ与spring的整合,实际上还存在许多问题,因为消息发送是不可靠的。这篇文章将实现可靠消息的发送。

          不要问我为什么字体为什么时大时小,CSDN的博客就是这么任性,我已经尽力了。

       以下是文章的大纲

spring amqp api 简介
可靠消息的整体思路
通过确认机制、mandatory、缓存确保生产端消息的可靠性
生产端过滤攻击
通过死信确保消费端的消息可靠性,并弥补老版本的immediate
通过errorhandle防止致命错误

¤spring amqp api 简介

              rabbitmq的客户端所提供的api很简洁,ConnectionFactory->Connection->Channel,然后就可以通过channel调用各种命令了。

        spring amqp则扩充了一些列类,而且优雅地提供了一些不错的机制。


        建立连接

一样的,提供了一个连接工厂org.springframework.amqp.rabbit.connection.CachingConnectionFactory

        不同的是:1、mandatory的指定
                                        rabbitmq的客户端是在basicPublish指令中指定mandatory

              spring amqp则是connectionFactory.setPublisherReturns
                          2、确认机制开启
                              rabbitmq的客户端是用confirmSelect指令
                              spring amqp则是connectionFactory.setPublisherConfirms


       
类的分解
         spring amqp分出了不同的类来对应amqp协议的概念,而不是只有一个channel。
          分出来的类有三种类型的exchange、queue、binding

       操作类
           和数据库一样,spring为操作封装了一个模板RabbitTemplate。其中包含了publish、get等命令的实现

       消息对象
           rabbitmq只允许发送byte数组,但是spring amqp在此基础上进行了扩展,可以发送任何的对象。

           消息可以用Message封装交付因子。MessageProperties设置消息头。
          消息和对象可以通过一系列的Conventer自动转换。

     消费监听
           spring amqp提供了使用@RabbitListener进行优雅的订阅消息。其主要基于SimpleRabbitListenerContainerFactory配置。另外,还有一种方式是通过SimpleMessageListenerContainer。

   errorHandle
    errorHandle机制是这样的,针对一些致命错误,比如使消费端瘫痪的错误,使用错误机制将使其自动nack,并且不论设置是否可重入队列,都将把消息丢弃。这样做的好处就是防止轮询到其他消费端,是大片消费端瘫痪。

   

¤可靠消息的整体思路

直接上图吧



不要问我中间的缓存处理为什么这么绕,要问就问rabbitmq自生的确认机制和mandatory的回调顺序为什么这么奇葩




¤通过确认机制、mandatory、缓存确保生产端消息的可靠性

        基本概念这里都不总结了,可以看这里《rabbitMq的总结》

        生产端造成消息丢失点总共有

            1、找不到交换机

                  确认机制会在这个时候起效果。然后相应地处理缓存即可

                  开启确认机制:spring-amqp:connectionFactory.setPublisherConfirms(rabbitmqProps.isPublisherConfirms());

                                              客户端:channel.confirmSelect();

                  回调的设置:
//确认机制,监听无法到达交换机时的回调
template.setConfirmCallback((correlationData,ack,cause)->{
logger.debug("confirm回调!");
if (ack) {
logger.debug(correlationData+"消息到达交换机!");
//从备份缓存中移除
if(correlationData instanceof CacheCorrelationData){
CacheCorrelationData cacheCorrelationData=(CacheCorrelationData) correlationData;
String cacheName=cacheCorrelationData.getCacheName();
String cacheKey=cacheCorrelationData.getId();

Map<String,Object> copyCache=MessageCacheManager.instance().get("copy");
if(copyCache!=null){
MetaMessage metaMessage=(MetaMessage) copyCache.get(cacheKey);
//没有找到任何队列
if(metaMessage!=null){
logger.debug("加入重发缓存!");
Optional.ofNullable(MessageCacheManager.instance().get("copy")).ifPresent(map->{
Optional.ofNullable(metaMessage).ifPresent(message->{
MessageCacheUtil.add(cacheName,
correlationData.getId(),
message);
});
MessageCacheUtil.remove("copy", cacheKey);
});

logger.debug("清除备份缓存!");
MessageCacheUtil.remove("copy", cacheKey);
}
//找到队列
else{
logger.debug("清除重发缓存!");
MessageCacheUtil.remove(cacheName, cacheKey);

logger.debug("清除备份缓存!");
MessageCacheUtil.remove("copy", cacheKey);
}
}
//找到队列
else{
logger.debug("清除重发缓存!");
MessageCacheUtil.remove(cacheName, cacheKey);
}

}
else{
logger.info(correlationData+"没有使用缓存,消息将被丢弃,不会尝试重发!");
}

} else {
logger.info(correlationData+"消息没有找到交换机!" + cause);
//清除备份缓存
if(correlationData instanceof CacheCorrelationData){
logger.debug("清除备份缓存!");
CacheCorrelationData cacheCorrelationData=(CacheCorrelationData) correlationData;
String cacheName=cacheCorrelationData.getCacheName();
String cacheKey=cacheCorrelationData.getId();
//从copy缓存中获得return时备份的消息
Optional.ofNullable(MessageCacheManager.instance().get("copy")).ifPresent(map->{
MetaMessage metaMessage=(MetaMessage) map.get(cacheKey);
Optional.ofNullable(metaMessage).ifPresent(message->{
MessageCacheUtil.add(cacheName,
correlationData.getId(),
message);
});
MessageCacheUtil.remove("copy", cacheKey);
});

}
else{
logger.info(correlationData+"没有使用缓存,消息将被丢弃,不会尝试重发!");
}
}
});

            

     2、不可路由

                   在交换机上 找不到任何路由键匹配的绑定队列的时候,设置mandatory为true,可以得到回掉。

                   设置方法:spring-amqp:rabbitTemplate.setMandatory(true);

                                       客户端:channel.basicPublish的mandatory参数

                   回调的设置:
//不可路由时回调mandatory
template.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{
logger.debug("return回调!");
logger.info("没有找到任何匹配的队列!"+
"message:"+message+
",replyCode:"+replyCode+
",replyText:"+replyText+
",exchange:"+exchange+
",routingKey:"+routingKey);

//从缓存中移除
logger.debug("清除重发缓存!");
String messageJsonstr=new String(message.getBody());
CacheMessage cacheMessage=JSON.parseObject(messageJsonstr,CacheMessage.class);
String cacheName=cacheMessage.getCacheCorrelationData().getCacheName();
String cacheKey=cacheMessage.getCacheCorrelationData().getId();
MetaMessage metaMessage=(MetaMessage) MessageCacheManager.instance().get(cacheName).get(cacheKey);
MessageCacheUtil.remove(cacheName,cacheKey);

//由于amqp奇葩的协议规定,return比confirm先回调,所以放入一个备份缓存,以备confirm中还能找到该消息
logger.debug("加入备份缓存!");
MessageCacheUtil.add("copy", cacheMessage.getCacheCorrelationData().getId(), metaMessage);
});

             3、持久化之前服务挂了

                     每隔一段时间从重发缓存中取出消息重发

                     重发可以使用quartz、spring4的scheduler、甚至spring retry

                     持久化的设置:spring-amqp:Queue构造函数其中的durable参数

                                                  客户端:builder.deliveryMode(2);其中builder为AMQP.BasicProperties.Builder 
                     重发的代码将在下一章节给出

¤生产端过滤攻击

       这里还有一个小问题,万一有人故意试探或者攻击服务器,这个时候要能辨别出来并且直接丢弃。可以预先把所有合法的路由键存起来,重发的时候进行比较。
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.wlf.demo.pojo.CacheMessage;
import com.wlf.demo.pojo.MetaMessage;
import com.wlf.demo.util.AttactMessageFilter;
import com.wlf.demo.util.CacheCorrelationData;
import com.wlf.demo.util.MessageCacheManager;
import com.wlf.demo.util.MessageCacheUtil;

/**
*
* 确认机制confirm返回nack后的信息在此统一重发
*
* @author 吴林峰
*
*/
@Component
public class MessageRetrySchedule {

private static final Logger logger = LoggerFactory.getLogger(MessageRetrySchedule.class);

@Autowired
private RabbitTemplate rabbitTemplate;

/**
*
* 重发
*
*/
@Scheduled(fixedRate = 10000)
public void sendMessage() {
System.out.println("go!");
Map<String,Map<String,Object>> caches=MessageCacheManager.instance().getAll();
caches.forEach((cacheName,cache)->{
cache.forEach((key,message)->{
MetaMessage metaMessage=(MetaMessage) message;
if(AttactMessageFilter.instance().isAttact(metaMessage.getRoutingKey())){
logger.debug("不合法的路由键,可能是外部攻击!"+metaMessage.getRoutingKey());
MessageCacheUtil.remove(cacheName, key);
return;
}
//重发尝试5次
if(metaMessage.getCount()==6){
logger.info("消息无法找到交换机,message:"+message+"exchange:"+metaMessage.getExchange()+",routingKeys:"+metaMessage.getRoutingKey());
logger.info("通知管理员并写入数据库(注意去重)");
MessageCacheUtil.remove(cacheName, key);
return;
}
CacheCorrelationData correlationId = new CacheCorrelationData(metaMessage.getPayload().toString(),cacheName);
logger.info("消息重发!exchange:"+metaMessage.getExchange()+",routingKeys:"+metaMessage.getRoutingKey()+",correlationData:"+correlationId.toString());
CacheMessage cacheMessage=new CacheMessage();
CacheCorrelationData correlationData = new CacheCorrelationData(key,cacheName);
cacheMessage.setCacheCorrelationData(correlationData);
cacheMessage.setPayload(metaMessage.getPayload());
Message msg=new Message(JSONObject.toJSONString(cacheMessage).getBytes(),MessagePropertiesBuilder.newInstance().setContentType("text/x-json").build());
rabbitTemplate.send(metaMessage.getExchange(), metaMessage.getRoutingKey(), msg, correlationId);
metaMessage.setCount(metaMessage.getCount()+1);
});
});
}

}

¤通过死信确保消费端的消息可靠性,并弥补老版本的immediate

           死信的基本概念也不简绍了,可以看这里《rabbitMq的总结》
           

1、为什么引入死信

           为什么引入死信,可能很多刚接触消息队列的人都会有这个疑惑吧。我谈谈我的看法。虽然有重新入队的机制,但是所有消费端未发送成功的消息都重入队真的就没什么问题?我先抛出几个问题。
            a、如果是bug,那么该消息永远不可能处理成功,难道就任由它无限重发
            b、如果是致命错误,会导致消费端瘫痪,那么重入队列,负载均衡到其它消费端后、造成其它消费端瘫痪
            c、如果是队列已满,如何获知该原因并通知管理员
            d、如果对不同错误需要不同的处理
            e、如何提供一个统一的地方处理以上问题,而不是分散到每个消费端代码中
            f、弥补immediate,由于有了死信,就不需要在没有消费端的时候回调了,因为消息会过期进入死信

            通过死信,提供了处理以上问题的地方。对于b、d两点,spring amqp还提供了一个errorHandle来进行处理

         

       2、死信的设置

          a、需要被重新路由到死信的队列上设置死信交换机
          在queue的arguments中设置,arguments.put("x-dead-letter-exchange", "dlxExchange");还可以设置一个死信路由键x-dead-letter-routing-key
          设置queue的arguments,spring amqp和客户端的设置基本类似,无非是在生成队列时,一个是调用Queue的构造器传入arguments,一个是 
          channel.declareQueue传入arguments。
          b、设置TTL
          ttl存在两种:1、每个队列中消息的TTL 在queue的arguments中设置arguments.put("x-message-ttl",
6000);
                            2、每个消息的TTL,在消息头中设置
                                 spring amqp:Message expirationMsg=new Message(message.getBytes(),

                                                                                                                 MessagePropertiesBuilder.newInstance()

                                                                                                                                                        .setExpiration(originalExpiration)

                                                                                                                                                        .build()

                                                                                             );

                                                       rabbitTemplate.send(exchange, key, expirationMsg, correlationId);
                                 客户端:AMQP.BasicProperties properties = new
AMQP.BasicProperties();

                                             properties.setExpiration("60000");

                                             channel.basicPublish(exchangeName,routingKey,mandatory,properties,"ttlTestMessage".getBytes());
            两种ttl的区别在于,判断是否过期的时候,在queue上设置的ttl只需要判断queue头信息即可,在message上设置的ttl需要判断每个消息头。
            c、设置死信队列
                 设置一个默认的队列就可以
            d、绑定
                 死信交换机和死信队列进行绑定,路由键位设置的x-dead-letter-routing-key,或者原队列的路由键

            需要注意的是,死信队列上不会有DLX标识,而是在原队列上标注
            


             死信交换机上也不会有DLX标识
             


       3、spring amqp的ack规则

             除了可以手动ack以外,rabbitMq提供了auto ack机制,它会在收到消息后自动ack。但是spring amqp中则又些不同。

             先来看一下如何设置auto ack吧
             spring amqp:simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
             客户端:channel.basicConsumer中的参数
          
          机制如下:
          a、消费端出现异常将自动nack
         b、simpleRabbitListenerContainerFactory.setDefaultRequeueRejected设置默认是否重入队列
               该设置使得nack时是否重入队列,但是存在以下例外
               Ⅰ、errorHandle中拦截的异常不会重入队列
               Ⅱ、抛出AmqpRejectAndDontRequeueExceptiond异常不会重入队列

          整个死信的消费端代码
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.wlf.demo.pojo.CacheMessage;
import com.wlf.demo.util.CacheCorrelationData;

/**
*
* 死信统一分情况处理消费端无法消费的消息,另外也处理老版本immediate的情况,即没有消费者的情况
*
* @author 吴林峰
*
*/
@Component
@RabbitListener(queues = "dlxQueue")
public class DeadRecv {

private static final Logger logger = LoggerFactory.getLogger(DeadRecv.class);

private Map<String,Integer> expiredCounter=new ConcurrentSkipListMap<String,Integer>();

private final String EXPIRED="expired";
private final String REJECTED="rejected";
private final String MAXLEN="maxlen";

@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitHandler
public void receive(@Header("x-death") List<Map<String,Object>> ds,String message, Channel channel) {
try{
logger.info("死信!"+message);
CacheMessage cacheMessage=JSONObject.parseObject(message,CacheMessage.class);
Map<String,Object> params=ds.stream()
.findFirst()
.get();
String reason=(String) params.get("reason");
String queue=(String) params.get("queue");
String originalExpiration=(String) params.get("original-expiration");
String exchange=(String) params.get("exchange");
List<String> routingKeys=(List<String>) params.get("routing-keys");
String key=routingKeys.stream().findFirst().get();
logger.debug("reason:"+reason+
",queue:"+queue+
",originalExpiration:"+originalExpiration+
",exchange:"+exchange+
",routingKeys:"+key);

CacheCorrelationData correlationId = new CacheCorrelationData(message,cacheMessage.getCacheCorrelationData().getCacheName());
Message msg=new Message(message.getBytes(),
MessagePropertiesBuilder.newInstance()
.setContentType("text/x-json")
.build()
);
if(reason.equals(EXPIRED)){
//五次重发的机制,不是一味的重发
int count=Optional.ofNullable(expiredCounter.get(cacheMessage.getCacheCorrelationData().getId())).orElse(0);
logger.debug("重发详情:key="+cacheMessage.getCacheCorrelationData().getId()+",count="+count);
if(count==6){
expiredCounter.remove(cacheMessage.getCacheCorrelationData().getId());
logger.debug("消息"+message+"无法被消费端处理,queue:"+queue+"exchange:"+exchange+"routingKeys:"+key);
logger.debug("通知管理员并写入数据库(注意去重)");
return;
}
count++;
expiredCounter.put(cacheMessage.getCacheCorrelationData().getId(), count);

//消息重发
Optional.ofNullable(originalExpiration).ifPresent(e->{
Message expirationMsg=new Message(message.getBytes(),
MessagePropertiesBuilder.newInstance()
.setExpiration(originalExpiration)
.setContentType("text/x-json")
.build()
);
rabbitTemplate.send(exchange, key, expirationMsg, correlationId);
});
rabbitTemplate.send(exchange, key, msg, correlationId);
}
else if(reason.equals(REJECTED)){
//由于致命错误,为了防止负载均衡的轮询策略,路由到其他消费者,造成大量消费者瘫痪,直接丢弃该消息
logger.info("消息被拒绝!"+message);
logger.info("通知管理员并写入数据库(注意去重)");
}
else if(reason.equals(MAXLEN)){
logger.info("队列已满,消息转发!"+queue);
//利用负载均衡的轮询策略,重发消息
rabbitTemplate.send(exchange, key, msg, correlationId);
}
}catch(Exception e){
logger.info("死信处理出错!"+message);
logger.info("通知管理员并写入数据库(注意去重)");
e.printStackTrace();
throw new AmqpRejectAndDontRequeueException(e.getMessage());
}
}

}
           

¤通过errorhandle防止致命错误

        最后一个问题,消费端遇到致命的错误。这个问题,spring amqp提供了一个比较不错的机制errorhandle。在消费端抛出异常后,会在FatalExceptionStrategy的实现类中判断是否是致命异常,调用isFatal方法。如果是致命异常,消息将不会重新入队。
        spring amqp在ConditionalRejectingErrorHandler.DefaultExceptionStrategy类中默认封装了一系列异常,在新版本中,自己的异常只需要继承ConditionalRejectingErrorHandler.DefaultExceptionStrategy类,重载isFatal方法即可。老版本该类是私有的,只能复制代码。我这里做测试,引入了NullPonterException
        
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
import org.springframework.messaging.handler.annotation.support.MethodArgumentTypeMismatchException;

public class MessageFatalExceptionStrategy implements FatalExceptionStrategy {

private static final Logger logger = LoggerFactory.getLogger(MessageFatalExceptionStrategy.class);

@Override
public boolean isFatal(Throwable t) {
logger.debug("isFatal");
if (t instanceof ListenerExecutionFailedException && causeIsFatal(t.getCause())) {
logger.debug("通知管理员程序存在bug,导致"+t.getMessage()+",使得消费端瘫痪");
return true;
}
return false;
}

public static boolean causeIsFatal(Throwable cause) {
return cause instanceof MessageConversionException
|| cause instanceof org.springframework.messaging.converter.MessageConversionException
|| cause instanceof MethodArgumentNotValidException
|| cause instanceof MethodArgumentTypeMismatchException
|| cause instanceof NullPointerException;
}

}
             git地址:https://github.com/wulinfeng2/spring-boot-rabbitmq
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息