您的位置:首页 > 其它

RabbitMQ一个简单可靠的方案(.Net Core实现)

2018-08-27 02:16 531 查看

前言

  最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:

  1. 临时异常,如数据库网络闪断、http请求临时失效等;

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;

  5. 非法异常,一些伪造、攻击类型的消息。

  针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。

方案

_channel.BasicQos(0, 1, false);

_channel.ExchangeDeclare("Exchange", "direct");
_channel.QueueDeclare("QueueB", true, false, false);
_channel.QueueBind("QueueB", "Exchange", "RouteB");

var retryDic = new Dictionary<string, object>
{
{"x-dead-letter-exchange", "Exchange"},
{"x-dead-letter-routing-key", "RouteB"}
};

_channel.ExchangeDeclare("Exchange_Retry", "direct");
_channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
_channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
//The QueueB is always failed.
bool canAck;
var retryCount = 0;
if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
{
retryCount = (int)ea.BasicProperties.Headers["retryCount"];
_logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
}

try
{
Handle();
canAck = true;
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Error!");
if (CanRetry(retryCount))
{
SetupRetry(retryCount, "Exchange_Retry", "RouteB_Retry", ea);
canAck = true;
}
else
{
canAck = false;
}
}

try
{
if (canAck)
{
_channel.BasicAck(ea.DeliveryTag, false);
}
else
{
_channel.BasicNack(ea.DeliveryTag, false, false);
}
}
catch (AlreadyClosedException ex)
{
_logger.LogCritical(ex, "RabbitMQ is closed!");
}
};

_channel.BasicConsume("QueueB", false, consumer);


View Code

审计消费者(Audit Comsumer)

  1. 声明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");

  _channel.QueueDeclare("QueueAudit", true, false, false);
  _channel.QueueBind("QueueAudit", "Exchange", "RouteA");
  _channel.QueueBind("QueueAudit", "Exchange", "RouteB");


  2. 排除死信Exchange转发过来的重复消息

  if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death"))
  {
  ...
  }


  3. 生成消息实体

  var message = new Message
  {
  MessageId = ea.BasicProperties.MessageId,
  Body = ea.Body,
  Exchange = ea.Exchange,
  Route = ea.RoutingKey
  };


  4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串

  if (ea.BasicProperties.Headers != null)
  {
  var headers = new Dictionary<string, object>();

  foreach (var header in ea.BasicProperties.Headers)
  {
  if (header.Value is byte[] bytes)
  {
  headers[header.Key] = Encoding.UTF8.GetString(bytes);
  }
  else
  {
  headers[header.Key] = header.Value;
  }
  }

  message.Headers = headers;
  }


  5. 把Unix格式的Timestamp转成UTC时间

  if (ea.BasicProperties.Timestamp.UnixTime > 0)
  {
  message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;
  var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
  message.Timestamp = offset.UtcDateTime;
  }


  6. 消息存入MongoDB

  _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);


  MongoDB记录:

  


  重试记录:

  


消息检索及重发(WebApi)

  1. 通过消息Id检索消息

  


  2. 通过头消息检索消息

  


  


  3. 消息重发,会重新生成MessageId

  


  


Ack,Nack,Reject的关系

  1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。

  2. 消息处理失败,执行Nack或者Reject:

  a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;

  b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;

  c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。

  3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。

  

RabbitMQ自动恢复

连接(Connection)恢复

  1. 重连(Reconnect)

  2. 恢复连接监听(Listeners)

  3. 重新打开通道(Channels)

  4. 恢复通道监听(Listeners)

  5. 恢复basic.qos,publisher confirms以及transaction设置

  

拓扑(Topology)恢复

  1. 重新声明交换机(Exchanges)

  2. 重新声明队列(Queues)

  3. 恢复所有绑定(Bindings)

  4. 恢复所有消费者(Consumers)

异常处理机制

  1. 临时异常,如数据库网络闪断、http请求临时失效等

  通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。

  2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行

  通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。

  

  3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理

  等系统修正后,通过消息重发的方式处理。

  4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等

  等系统恢复后,通过消息重发的方式处理。

  5. 非法异常,一些伪造、攻击类型的消息

  多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。

源码地址

https://github.com/ErikXu/RabbitMesage
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: