RabbitMQ一个简单可靠的方案(.Net Core实现)
2018-08-27 02:16
531 查看
前言
最近需要使用到消息队列相关技术,于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题,归纳了一下,大概有以下几种:1. 临时异常,如数据库网络闪断、http请求临时失效等;
2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行;
3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理;
4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等;
5. 非法异常,一些伪造、攻击类型的消息。
针对这些异常,我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。
方案
_channel.BasicQos(0, 1, false); _channel.ExchangeDeclare("Exchange", "direct"); _channel.QueueDeclare("QueueB", true, false, false); _channel.QueueBind("QueueB", "Exchange", "RouteB"); var retryDic = new Dictionary<string, object> { {"x-dead-letter-exchange", "Exchange"}, {"x-dead-letter-routing-key", "RouteB"} }; _channel.ExchangeDeclare("Exchange_Retry", "direct"); _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic); _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry"); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { //The QueueB is always failed. bool canAck; var retryCount = 0; if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount")) { retryCount = (int)ea.BasicProperties.Headers["retryCount"]; _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started..."); } try { Handle(); canAck = true; } catch (Exception ex) { _logger.LogCritical(ex, "Error!"); if (CanRetry(retryCount)) { SetupRetry(retryCount, "Exchange_Retry", "RouteB_Retry", ea); canAck = true; } else { canAck = false; } } try { if (canAck) { _channel.BasicAck(ea.DeliveryTag, false); } else { _channel.BasicNack(ea.DeliveryTag, false, false); } } catch (AlreadyClosedException ex) { _logger.LogCritical(ex, "RabbitMQ is closed!"); } }; _channel.BasicConsume("QueueB", false, consumer);
View Code
审计消费者(Audit Comsumer)
1. 声明Exchange和Queue_channel.ExchangeDeclare("Exchange", "direct"); _channel.QueueDeclare("QueueAudit", true, false, false); _channel.QueueBind("QueueAudit", "Exchange", "RouteA"); _channel.QueueBind("QueueAudit", "Exchange", "RouteB");
2. 排除死信Exchange转发过来的重复消息
if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death")) { ... }
3. 生成消息实体
var message = new Message { MessageId = ea.BasicProperties.MessageId, Body = ea.Body, Exchange = ea.Exchange, Route = ea.RoutingKey };
4. RabbitMQ会用bytes来存储字符串,因此,要把头中bytes转回字符串
if (ea.BasicProperties.Headers != null) { var headers = new Dictionary<string, object>(); foreach (var header in ea.BasicProperties.Headers) { if (header.Value is byte[] bytes) { headers[header.Key] = Encoding.UTF8.GetString(bytes); } else { headers[header.Key] = header.Value; } } message.Headers = headers; }
5. 把Unix格式的Timestamp转成UTC时间
if (ea.BasicProperties.Timestamp.UnixTime > 0) { message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime; var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime); message.Timestamp = offset.UtcDateTime; }
6. 消息存入MongoDB
_mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);
MongoDB记录:
重试记录:
消息检索及重发(WebApi)
1. 通过消息Id检索消息2. 通过头消息检索消息
3. 消息重发,会重新生成MessageId
Ack,Nack,Reject的关系
1. 消息处理成功,执行Ack,RabbitMQ会把消息从队列中删除。2. 消息处理失败,执行Nack或者Reject:
a) 当requeue=true时,消息会重新回到队列,然后当前消费者会马上再取回这条消息;
b) 当requeue=false时,如果Exchange有设置Dead Letter Exchange,则消息会去到Dead Letter Exchange;
c) 当requeue=false时,如果Exchange没设置Dead Letter Exchange,则消息从队列中删除,效果与Ack相同。
3. Nack与Reject的区别在于:Nack可以批量操作,Reject只能单条操作。
RabbitMQ自动恢复
连接(Connection)恢复
1. 重连(Reconnect)2. 恢复连接监听(Listeners)
3. 重新打开通道(Channels)
4. 恢复通道监听(Listeners)
5. 恢复basic.qos,publisher confirms以及transaction设置
拓扑(Topology)恢复
1. 重新声明交换机(Exchanges)2. 重新声明队列(Queues)
3. 恢复所有绑定(Bindings)
4. 恢复所有消费者(Consumers)
异常处理机制
1. 临时异常,如数据库网络闪断、http请求临时失效等通过短时间重试(如1秒后)的方式处理,也可以考虑Nack/Reject来实现重试(时效性更高)。
2. 时序异常,如A任务依赖于B任务,但可能由于调度或消费者分配的原因,导致A任务先于B任务执行
通过长时间重试(如1分钟、30分钟、1小时、1天等),等待B任务先执行完的方式处理。
3. 业务异常,由于系统测试不充分,上线后发现某几个或某几种消息无法正常处理
等系统修正后,通过消息重发的方式处理。
4. 系统异常,业务中间件无法正常操作,如网络中断、数据库宕机等
等系统恢复后,通过消息重发的方式处理。
5. 非法异常,一些伪造、攻击类型的消息
多次重试失败后,消息从队列中被删除,也可以针对此业务做进一步处理。
源码地址
https://github.com/ErikXu/RabbitMesage相关文章推荐
- 实现一个简单的服务端推送方案-实例篇Polling
- 自行控制loadrunner的socket协议性能测试 (转) 一前言 二任务的提出 三实现方案讨论 四技术要点讲解 如何开始录制一个最简单的收发数据包脚本 写日志文件 一行一行读数据包文件 字
- 防止mdf文件被非法附加后修改的一个简单实现方案
- 实现一个简单的服务端推方案
- 实现一个简单的服务端推方案
- 实现一个简单的服务端推送方案
- 实现一个简单的服务端推送方案-实例篇Push
- 用UDP实现可靠文件传输,如何利用UDX创建一个简单的WIN32程序
- 一个简单的大数实现方案,计算斐波纳契数列
- .NET Core微服务之路:利用DotNetty实现一个简单的通信过程
- 实现一个简单的服务端推送方案-实例篇Push
- .NET Core微服务之路:利用DotNetty实现一个简单的通信过程
- 实现一个简单的服务端推送方案-实例篇Polling(服务端阻塞读)
- 一个简单的文件服务器实现方案
- 用UDP实现可靠文件传输,如何利用UDX创建一个简单的WIN32程序
- 一个简单的文件服务器实现方案
- 一个简单的分布式锁实现方案
- 实现一个简单的服务端推送方案-实例篇Polling(服务端阻塞读,SLEEP方式读MYSQL)
- 转:实现一个简单的服务端推送方案
- 使用ListView实现一个简单的学生信息展示效果