C# RabbitMq (基于消费者事件实现)
2017-12-08 13:30
337 查看
源码源码源码源码
源码源码源码源码
一:服务方法
public class RabbitEventService: BaseService { #region 构造函数 /// <summary> /// 构造函数 /// </summary> /// <param name="config"></param> public RabbitEventService(RabbitMqConfigModel config) : base(config) { } #endregion #region 方法 #region 接收消息 /// <summary> /// 接收消息 /// </summary> /// <param name="method"></param> public void Receive(Func<string, bool> method) { try { using (var channel = _connection.CreateModel()) { //申明队列 channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null); if (!string.IsNullOrEmpty(RabbitConfig.ExchangeName)) //若交换机不为空 { //申明路由 channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableQueue); //队列和交换机绑定 channel.QueueBind(RabbitConfig.QueueName, RabbitConfig.ExchangeName, RabbitConfig.RoutingKey); } //channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); //注册接收事件,一旦创建连接就去拉取消息 consumer.Received += (model, ea) => { //阻塞函数,获取队列中的消息 ProcessingResultsEnum processingResult = ProcessingResultsEnum.Retry; try { var body = ea.Body; string message = Encoding.UTF8.GetString(body); method(message); processingResult = ProcessingResultsEnum.Accept; } catch(Exception) { processingResult = ProcessingResultsEnum.Reject; //系统无法处理的错误 } finally { switch (processingResult) { case ProcessingResultsEnum.Accept: //回复确认处理成功 channel.BasicAck(ea.DeliveryTag, false);//处理单挑信息 break; case ProcessingResultsEnum.Retry: b3c9 //发生错误了,但是还可以重新提交给队列重新分配 channel.BasicNack(ea.DeliveryTag, false, true); break; case ProcessingResultsEnum.Reject: //发生严重错误,无法继续进行,这种情况应该写日志或者是发送消息通知管理员 channel.BasicNack(ea.DeliveryTag, false, false); //写日志 break; } } }; channel.BasicConsume(RabbitConfig.QueueName, false,//和tcp协议的ack一样,为false则服务端必须在收到客户端的回执(ack)后才能删除本条消息 consumer); System.Threading.Thread.Sleep(-1); } } catch (Exception) { } } #endregion #endregion }
二:配置
public class ConfigHelper { public static RabbitEventService CreateDefaultInstance() { return new RabbitEventService(new RabbitMqConfigModel() { IP = "127.0.0.1", UserName = "guest", Password = "guest", Port = 15672, VirtualHost = "kevinHost", DurableQueue = true, QueueName = "hello", ExchangeName = "hello_ex1", ExchangeType = ExchangeTypeEnum.direct, DurableMessage = false, RoutingKey = "bug" }); } }
三:消息推送
static void Main(string[] args) { RabbitEventService mq = RabbitMq.Consumer.Config.ConfigHelper.CreateDefaultInstance(); string message = "Hello World!";//待发送的消息 string errMsg = ""; for (int i = 0; i < 10000000; i++) { string body = string.Format(" {0} Sent: {1}", i, message); Thread.Sleep(10); mq.Send<string>(body, ref errMsg); Console.WriteLine(body); } Console.ReadLine(); }
四:消费端
static void Main(string[] args) { RabbitEventService mq = RabbitMq.Consumer.Config.ConfigHelper.CreateDefaultInstance(); mq.Receive(Write); } public static bool Write(string message) { Console.WriteLine("Received {0}", message); return new Random().Next(0, 1) == 0 ? false : true; }
源码源码源码源码
相关文章推荐
- Go/Python/Erlang编程语言对比分析及示例 基于RabbitMQ.Client组件实现RabbitMQ可复用的 ConnectionPool(连接池) 封装一个基于NLog+NLog.Mongo的日志记录工具类LogUtil 分享基于MemoryCache(内存缓存)的缓存工具类,C# B/S 、C/S项目均可以使用!
- 重温.NET下Assembly的加载过程 ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线
- Redis总结(五)缓存雪崩和缓存穿透等问题 Web API系列(三)统一异常处理 C#总结(一)AutoResetEvent的使用介绍(用AutoResetEvent实现同步) C#总结(二)事件Event 介绍总结 C#总结(三)DataGridView增加全选列 Web API系列(二)接口安全和参数校验 RabbitMQ学习系列(六): RabbitMQ 高可用集群
- ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线
- Java基于Spring Boot、RabbitMQ实现事件驱动模式
- ASP.NET Core Web API下事件驱动型架构的实现(三):基于RabbitMQ的事件总线
- C#里的委托和事件实现 (转)
- C#WinForm中实现基于角色的权限菜单--C#-程序设计-计算机学习网-育龙网
- C#中的异步调用及异步设计模式(三)——基于事件的异步模式
- linux网络编程之System V 信号量(三):基于生产者-消费者模型实现先进先出的共享内存段
- DotNet(C#)实现事件远程注册和触发
- windows下(互斥量和事件对象)实现简单象消费者和生产者线程
- C#基于事件驱动的多串口多线程串口通讯软件架构设计
- 理解Node.js的事件循环(代码是异步单线程,内部实现用的还是进程和线程,基于池化的线程实现异步)
- 《设计模式--基于C#的工程化实现及扩展》 Security Design Pattern 系列 3 检查点模式(Check Point)
- 用ASP和SQL实现基于Web的事件日历
- c#中异步基于消息通信的完成端口的TCP/IP协议的组件实现(源代码) 客户端
- 基于C#的socket编程的TCP异步的实现代码
- 利用边缘改进全局阈值处理-c#实现-基于EmguCv