RabbitMQ .NET消息队列使用入门(二)【多个队列间消息传输】
2017-03-14 15:28
696 查看
孤独将会是人生中遇见的最大困难。
实体类:
DocumentType.cs
public enum DocumentType { //日志 Journal = 1, //论文 Thesis = 2, //会议文件 Meeting = 3 }
MessageModel.cs
public class MessageModel { public string Title { get; set; } public string Author { get; set; } public DocumentType DocType { get; set; } public override string ToString() { return Title; } /// <summary> /// 验证消息,Title与Author不能为空 /// </summary> /// <returns></returns> public bool IsVlid() { return !string.IsNullOrWhiteSpace(Title) && !string.IsNullOrWhiteSpace(Author); } }
异常代码类MessageException.cs:
public class MessageException : Exception { public MessageException(string message) : base(message) { } public MessageException(string message, Exception innerException) : base(message, innerException) { } } public class NoRPCConsumeException : Exception { public NoRPCConsumeException(string message) : base(message) { } public NoRPCConsumeException(string message, Exception innerException) : base(message, innerException) { } }
①第一个控制台应用程序
namespace PCApp { class Program { private static IConnection _senderConnection; private static IModel _channel; private static int _interval = 1; //消息发送间隔 private static bool isExit; private static void Main(string[] args) { Setup(); Console.WriteLine("准备发送消息到extractQueue队列:"); Send(); WaitCommand(); } /// <summary> /// 初始化 /// </summary> private static void Setup() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest", // virtual host只是起到一个命名空间的作用,所以可以多个user共同使用一个virtual host, //vritual_host= '/',这个是系统默认的,就是说当我们创建一个到rabbitmq的connection时候,它的命名空间是'/',需要注意的是不同的命名空间之间的资源是不能访问的,比如 exchang,queue ,bingding等 //VirtualHost = "test", AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; try { _senderConnection = factory.CreateConnection(); _senderConnection.ConnectionShutdown += _senderConnection_ConnectionShutdown; _channel = _senderConnection.CreateModel(); _channel.QueueDeclare("extractQueue", false, false, false, null); } catch (BrokerUnreachableException ex) { Console.WriteLine("ERROR: RabbitMQ服务器未启动!"); Thread.Sleep(2000); isExit = true; } } private static void _senderConnection_ConnectionShutdown(object sender, ShutdownEventArgs e) { Console.WriteLine("连接已关闭. " + e.ReplyText); } /// <summary> /// 等待接收指令 /// </summary> private static void WaitCommand() { while (!isExit) { string line = Console.ReadLine().ToLower().Trim(); string[] arr = line.Split(new[] {' '}); string cmd = arr[0]; switch (cmd) { case "exit": Close(); isExit = true; break; case "go": int count = 10; if (arr.Length > 1) { int.TryParse(arr[1], out count); } Send(count); break; case "interval": int.TryParse(arr[1], out _interval); break; case "clear": Console.Clear(); break; default: break; } } Console.WriteLine("Goodbye!"); } public static void Send(int msgCount = 10) { Console.WriteLine("---------- 开始发送------------"); for (int i = 1; i <= msgCount; i++) { string title = "测试文档" + i; string author = "lexworld" + i; int docType = i%2 + 1; string jsonFormat = "{{\"Title\":\"{0}\",\"Author\":\"{1}\",\"DocType\":{2}}}"; string message = string.Format(jsonFormat, title, author, docType); byte[] body = Encoding.UTF8.GetBytes(message); try { _channel.BasicPublish("", "extractQueue", null, body); } catch (AlreadyClosedException ex) { Console.WriteLine("ERROR: " + ex.Message); break; } Console.WriteLine("Time:" + DateTime.Now + " MSG:" + title); if (_interval > 0) { Thread.Sleep(_interval*1000); } } Console.WriteLine("---------- 结束 ------------"); } private static string GetMessage() { string argLine = string.Join(" ", Environment.GetCommandLineArgs()); string args = argLine.Substring(argLine.IndexOf(" ") + 1); Console.WriteLine("args:" + args); string[] arr = args.Split(new[] {','}); string jsonFormat = "{{\"Title\":\"{0}\",\"Author\":\"{1}\",\"DocType\":{2}}}"; return string.Format(jsonFormat, arr[0], arr[1], arr[2]); } private static void Close() { if (_channel != null && _channel.IsOpen) { _channel.Close(); } if (_senderConnection != null && _senderConnection.IsOpen) { _senderConnection.Close(); } } } }
②第二个控制台应用程序
class Program { private static IConnection _senderConn; private static IConnection _recvConn; //private static IModel _senderChannel; 多线程情况下,每个线程需要独立的channel来发送消息 private static IModel _recvChannel; private static bool isExit; private static void Main(string[] args) { Setup(); Console.WriteLine("开始消费extractQueue队列里的消息(同时推送到checkQueue检查队列):"); WaitCommand(); } /// <summary> /// 初始化 /// </summary> private static void Setup() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest", //VirtualHost = "test", //TopologyRecoveryEnabled = true, //默认为true,如果设置为false,则重连后不会重建相关实体,如:exchange,queue,binding AutomaticRecoveryEnabled = true //自动重连 }; try { _recvConn = factory.CreateConnection(); _recvConn.ConnectionShutdown += ConnectionShutdown; _recvChannel = _recvConn.CreateModel(); _recvChannel.QueueDeclare("extractQueue", false, false, false, null); _recvChannel.BasicQos(0, 10, false); var consumer = new EventingBasicConsumer(_recvChannel); consumer.Received += consumer_Received; _recvChannel.BasicConsume("extractQueue", false, consumer); _senderConn = factory.CreateConnection(); IModel channel = _senderConn.CreateModel(); channel.QueueDeclare("checkQueue", false, false, false, null); //channel.Close(); //这里如果关闭channel的话,自动重连的时候无法恢复checkQueue队列,因为checkQueue是使用channel创建的,恢复的时候还要使用channel,必须保持该信道不关闭 } catch (BrokerUnreachableException ex) { Console.WriteLine("ERROR: RabbitMQ服务器未启动!"); Thread.Sleep(2000); isExit = true; } } private static void ConnectionShutdown(object sender, ShutdownEventArgs e) { Console.WriteLine("Connection has already closed."); } /// <summary> /// 等待接收指令 /// </summary> private static void WaitCommand() { while (!isExit) { string line = Console.ReadLine().ToLower().Trim(); switch (line) { case "exit": Close(); isExit = true; break; case "clear": Console.Clear(); break; default: break; } } Console.WriteLine("Goodbye!"); } private static void Close() { if (_recvChannel != null && _recvChannel.IsOpen) { _recvChannel.Close(); } if (_recvConn != null && _recvConn.IsOpen) { _recvConn.Close(); } if (_senderConn != null && _senderConn.IsOpen) { _senderConn.Close(); } } #region 异步消息处理,客户端发送完消息后不再等待 /// <summary> /// 消息接收处理事件,多线程处理消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void consumer_Received(object sender, BasicDeliverEventArgs e) { byte[] body = e.Body; //bool isSuccess = false; Task.Run(() => HandlingMessage(body, e)); } /// <summary> /// 消息处理 /// </summary> /// <param name="msgModel"></param> /// <param name="e"></param> private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e) { bool isSuccess = false; string message = Encoding.UTF8.GetString(body); IModel _senderChannel = _senderConn.CreateModel(); //多线程中每个线程使用独立的信道 try { var msgModel = JsonConvert.DeserializeObject<MessageModel>(message); if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理 { throw new MessageException("消息解析失败"); } var random = new Random(); int num = random.Next(0, 4); //模拟处理失败 if (random.Next(0, 11) == 4) { throw new Exception("处理失败", null); } //模拟解析失败 if (random.Next(0, 11) == 8) { throw new MessageException("消息解析失败"); } await Task.Delay(num*1000); //这里简单处理,仅格式化输出消息内容 Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " Used: " + num + "s MSG:" + msgModel); isSuccess = true; } catch (MessageException msgEx) { Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message); _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发 return; } catch (Exception ex) { Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message); } if (isSuccess) { try { _senderChannel.BasicPublish("", "checkQueue", null, body); //发送消息到内容检查队列 _recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功 } catch (AlreadyClosedException acEx) { Console.WriteLine("ERROR:连接已关闭"); } } else { _recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发 } _senderChannel.Close(); } #endregion #region 同步消息处理(RPC) #endregion }
③第三个控制台应用程序
class Program { private static IConnection _recvConn; private static IConnection _senderConn; private static IModel _recvChannel; private static bool isExit; private static void Main(string[] args) { Setup(); Console.WriteLine("开始使用checkQueue队列里的消息(同时推送到reportQueue报告队列):"); WaitCommand(); } /// <summary> /// 初始化 /// </summary> private static void Setup() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "guest", Password = "guest", //VirtualHost = "test", TopologyRecoveryEnabled = true, AutomaticRecoveryEnabled = true }; try { _recvConn = factory.CreateConnection(); _recvConn.ConnectionShutdown += ConnectionShutdown; _recvChannel = _recvConn.CreateModel(); _recvChannel.QueueDeclare("checkQueue", false, false, false, null); _recvChannel.BasicQos(0, 10, false); var consumer = new EventingBasicConsumer(_recvChannel); consumer.Received += consumer_Received; _recvChannel.BasicConsume("checkQueue", false, consumer); _senderConn = factory.CreateConnection(); IModel channel = _senderConn.CreateModel(); channel.QueueDeclare("reportQueue", false, false, false, null); //channel.Close(); //这里如果关闭channel的话,自动重连的时候无法恢复reportQueue队列, //因为reportQueue是使用channel创建的,恢复的时候还要使用channel,必须保持该信道不关闭 } catch (BrokerUnreachableException ex) { Console.WriteLine("ERROR: RabbitMQ服务器未启动!"); Thread.Sleep(2000); isExit = true; } } private static void ConnectionShutdown(object sender, ShutdownEventArgs e) { Console.WriteLine("连接已关闭."); } /// <summary> /// 等待接收指令 /// </summary> private static void WaitCommand() { while (!isExit) { string line = Console.ReadLine().ToLower().Trim(); switch (line) { case "exit": Close(); isExit = true; break; case "clear": Console.Clear(); break; default: break; } } Console.WriteLine("Goodbye!"); } private static void Close() { if (_recvChannel != null && _recvChannel.IsOpen) { _recvChannel.Close(); } if (_recvConn != null && _recvConn.IsOpen) { _recvConn.Close(); } if (_senderConn != null && _senderConn.IsOpen) { _senderConn.Close(); } } #region 异步消息处理,客户端发送完消息后不再等待 /// <summary> /// 消息接收处理事件,多线程处理消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void consumer_Received(object sender, BasicDeliverEventArgs e) { byte[] body = e.Body; Task.Run(() => HandlingMessage(body, e)); } /// <summary> /// 消息处理 /// </summary> /// <param name="msgModel"></param> /// <param name="e"></param> private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e) { bool isSuccess = false; string message = Encoding.UTF8.GetString(body); IModel _senderChannel = _senderConn.CreateModel(); //多线程中每个线程使用独立的信道 try { var msgModel = JsonConvert.DeserializeObject<MessageModel>(message); if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理 { throw new MessageException("消息解析失败"); } var random = new Random(); int num = random.Next(0, 4); //模拟处理失败 if (random.Next(0, 11) == 4) { throw new Exception("处理失败", null); } //模拟解析失败 if (random.Next(0, 11) == 8) { throw new MessageException("消息解析失败"); } await Task.Delay(num*1000); //这里简单处理,仅格式化输出消息内容 Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " Used: " + num + "s MSG:" + msgModel); isSuccess = true; } catch (MessageException msgEx) { Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message); _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发 return; } catch (Exception ex) { Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message); } if (isSuccess) { try { _senderChannel.BasicPublish("", "reportQueue", null, body); //发送消息到内容检查队列 _recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功 } catch (AlreadyClosedException acEx) { Console.WriteLine("ERROR:连接已关闭"); } } else { _recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发 } _senderChannel.Close(); } #endregion }
④第四个控制台应用程序
class Program { private static IConnection _recvConn; private static IModel _recvChannel; private static bool isExit; //private static IConnection _receiverConn; //同步处理(RPC)时使用 private static void Main(string[] args) { Setup(); Console.WriteLine("开始消费reportQueue队列里的消息:"); WaitCommand(); } /// <summary> /// 初始化 /// </summary> private static void Setup() { var factory = new ConnectionFactory { HostName = "localhost", UserName = "test", Password = "test", VirtualHost = "test", TopologyRecoveryEnabled = true, AutomaticRecoveryEnabled = true }; try { _recvConn = factory.CreateConnection(); _recvConn.ConnectionShutdown += ConnectionShutdown; _recvChannel = _recvConn.CreateModel(); _recvChannel.QueueDeclare("reportQueue", false, false, false, null); _recvChannel.BasicQos(0, 10, false); var consumer = new EventingBasicConsumer(_recvChannel); consumer.Received += consumer_Received; _recvChannel.BasicConsume("reportQueue", false, consumer); } catch (BrokerUnreachableException ex) { Console.WriteLine("ERROR: RabbitMQ服务器未启动!"); Thread.Sleep(2000); isExit = true; } } private static void ConnectionShutdown(object sender, ShutdownEventArgs e) { Console.WriteLine("连接已关闭."); } /// <summary> /// 等待接收指令 /// </summary> private static void WaitCommand() { while (!isExit) { string line = Console.ReadLine().ToLower().Trim(); switch (line) { case "exit": Close(); isExit = true; break; case "clear": Console.Clear(); break; default: break; } } Console.WriteLine("Goodbye!"); } private static void Close() { if (_recvChannel != null && _recvChannel.IsOpen) { _recvChannel.Close(); } if (_recvConn != null && _recvConn.IsOpen) { _recvConn.Close(); } } #region 异步消息处理,客户端发送完消息后不再等待 /// <summary> /// 消息接收处理事件,多线程处理消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void consumer_Received(object sender, BasicDeliverEventArgs e) { byte[] body = e.Body; Task.Run(() => HandlingMessage(body, e)); } /// <summary> /// 消息处理 /// </summary> /// <param name="msgModel"></param> /// <param name="e"></param> private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e) { bool isSuccess = false; string message = Encoding.UTF8.GetString(body); try { var msgModel = JsonConvert.DeserializeObject<MessageModel>(message); if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理 { throw new MessageException("消息解析失败"); } var random = new Random(); int num = random.Next(0, 4); //模拟处理失败 if (random.Next(0, 11) == 4) { throw new Exception("处理失败", null); } //模拟解析失败 if (random.Next(0, 11) == 8) { throw new MessageException("消息解析失败"); } await Task.Delay(num*1000); //这里简单处理,仅格式化输出消息内容 Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " Used: " + num + "s MSG:" + msgModel); isSuccess = true; } catch (MessageException msgEx) { Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message); _recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发 return; } catch (Exception ex) { Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message); } if (isSuccess) { try { _recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功 } catch (AlreadyClosedException acEx) { Console.WriteLine("ERROR:连接已关闭"); } } else { _recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发 } } #endregion }
运行结果如图:
相关文章推荐
- RabbitMQ .NET消息队列使用入门(二)【多个队列间消息传输】
- RabbitMQ .NET消息队列使用入门(一)【简单示例】
- RabbitMQ .NET消息队列使用入门(三)【MVC实现RPC例子】
- RabbitMQ .NET消息队列使用入门(一)【简单示例】
- RabbitMQ .NET消息队列使用详解
- RabbitMQ .NET消息队列使用详解
- MQ入门总结(一)消息队列概念和使用场景
- MQ入门总结(一)消息队列概念和使用场景
- 消息队列系列(三):.Rabbitmq Trace的使用
- ActiveMq C#客户端 消息队列的使用(存和取)
- Activemq+spring的第一个程序(入门程序--内嵌Broker--消息队列)
- Posix消息队列使用非阻塞mq_receive的信号通知
- PHP使用php-resque库配合Redis实现MQ消息队列的教程
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- MQ消息队列系列(2)什么时候使用MQ
- Activemq+spring的第一个程序(入门程序--内嵌Broker--消息队列)
- 使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)
- 用管道(pipe)使消息队列通知(mq_notify)可以在Select和poll中使用
- .Net消息队列的使用
- Net消息队列的使用