您的位置:首页 > 其它

RabbitMQ .NET消息队列使用入门(二)【多个队列间消息传输】

2017-03-14 15:28 696 查看

孤独将会是人生中遇见的最大困难。

实体类:

DocumentType.cs

public enum DocumentType
{
//日志
Journal = 1,
//论文
Thesis = 2,
//会议文件
Meeting = 3
}


MessageModel.cs

public class MessageModel
{
public string Title { get; set; }
public string Author { get; set; }
public DocumentType DocType { get; set; }

public override string ToString()
{
return Title;
}

/// <summary>
/// 验证消息,Title与Author不能为空
/// </summary>
/// <returns></returns>
public bool IsVlid()
{
return !string.IsNullOrWhiteSpace(Title) && !string.IsNullOrWhiteSpace(Author);
}
}


异常代码类MessageException.cs:

public class MessageException : Exception
{
public MessageException(string message) : base(message)
{
}

public MessageException(string message, Exception innerException) : base(message, innerException)
{
}
}

public class NoRPCConsumeException : Exception
{
public NoRPCConsumeException(string message) : base(message)
{
}

public NoRPCConsumeException(string message, Exception innerException) : base(message, innerException)
{
}
}


①第一个控制台应用程序

namespace PCApp
{
class Program
{
private static IConnection _senderConnection;
private static IModel _channel;
private static int _interval = 1; //消息发送间隔
private static bool isExit;

private static void Main(string[] args)
{
Setup();

Console.WriteLine("准备发送消息到extractQueue队列:");

Send();

WaitCommand();
}

/// <summary>
///     初始化
/// </summary>
private static void Setup()
{
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
// virtual host只是起到一个命名空间的作用,所以可以多个user共同使用一个virtual host,
//vritual_host= '/',这个是系统默认的,就是说当我们创建一个到rabbitmq的connection时候,它的命名空间是'/',需要注意的是不同的命名空间之间的资源是不能访问的,比如 exchang,queue ,bingding等
//VirtualHost = "test",
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true
};

try
{
_senderConnection = factory.CreateConnection();
_senderConnection.ConnectionShutdown += _senderConnection_ConnectionShutdown;

_channel = _senderConnection.CreateModel();
_channel.QueueDeclare("extractQueue", false, false, false, null);
}
catch (BrokerUnreachableException ex)
{
Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
Thread.Sleep(2000);
isExit = true;
}
}

private static void _senderConnection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
Console.WriteLine("连接已关闭. " + e.ReplyText);
}

/// <summary>
/// 等待接收指令
/// </summary>
private static void WaitCommand()
{

while (!isExit)
{
string line = Console.ReadLine().ToLower().Trim();
string[] arr = line.Split(new[] {' '});
string cmd = arr[0];
switch (cmd)
{
case "exit":
Close();
isExit = true;
break;
case "go":
int count = 10;
if (arr.Length > 1)
{
int.TryParse(arr[1], out count);
}

Send(count);
break;
case "interval":
int.TryParse(arr[1], out _interval);
break;
case "clear":
Console.Clear();
break;
default:
break;
}
}

Console.WriteLine("Goodbye!");
}

public static void Send(int msgCount = 10)
{

Console.WriteLine("---------- 开始发送------------");

for (int i = 1; i <= msgCount; i++)
{
string title = "测试文档" + i;
string author = "lexworld" + i;
int docType = i%2 + 1;
string jsonFormat = "{{\"Title\":\"{0}\",\"Author\":\"{1}\",\"DocType\":{2}}}";
string message = string.Format(jsonFormat, title, author, docType);
byte[] body = Encoding.UTF8.GetBytes(message);
try
{
_channel.BasicPublish("", "extractQueue", null, body);
}
catch (AlreadyClosedException ex)
{
Console.WriteLine("ERROR: " + ex.Message);
break;
}

Console.WriteLine("Time:" + DateTime.Now + " MSG:" + title);

if (_interval > 0)
{
Thread.Sleep(_interval*1000);
}
}

Console.WriteLine("---------- 结束 ------------");
}

private static string GetMessage()
{
string argLine = string.Join(" ", Environment.GetCommandLineArgs());
string args = argLine.Substring(argLine.IndexOf(" ") + 1);
Console.WriteLine("args:" + args);
string[] arr = args.Split(new[] {','});
string jsonFormat = "{{\"Title\":\"{0}\",\"Author\":\"{1}\",\"DocType\":{2}}}";

return string.Format(jsonFormat, arr[0], arr[1], arr[2]);
}

private static void Close()
{
if (_channel != null && _channel.IsOpen)
{
_channel.Close();
}

if (_senderConnection != null && _senderConnection.IsOpen)
{
_senderConnection.Close();
}
}
}
}


②第二个控制台应用程序

class Program
{
private static IConnection _senderConn;
private static IConnection _recvConn;
//private static IModel _senderChannel; 多线程情况下,每个线程需要独立的channel来发送消息
private static IModel _recvChannel;
private static bool isExit;

private static void Main(string[] args)
{
Setup();

Console.WriteLine("开始消费extractQueue队列里的消息(同时推送到checkQueue检查队列):");

WaitCommand();
}

/// <summary>
///  初始化
/// </summary>
private static void Setup()
{
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
//VirtualHost = "test",
//TopologyRecoveryEnabled = true,   //默认为true,如果设置为false,则重连后不会重建相关实体,如:exchange,queue,binding
AutomaticRecoveryEnabled = true //自动重连
};

try
{
_recvConn = factory.CreateConnection();
_recvConn.ConnectionShutdown += ConnectionShutdown;
_recvChannel = _recvConn.CreateModel();
_recvChannel.QueueDeclare("extractQueue", false, false, false, null);
_recvChannel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(_recvChannel);
consumer.Received += consumer_Received;
_recvChannel.BasicConsume("extractQueue", false, consumer);

_senderConn = factory.CreateConnection();
IModel channel = _senderConn.CreateModel();
channel.QueueDeclare("checkQueue", false, false, false, null);
//channel.Close();
//这里如果关闭channel的话,自动重连的时候无法恢复checkQueue队列,因为checkQueue是使用channel创建的,恢复的时候还要使用channel,必须保持该信道不关闭
}
catch (BrokerUnreachableException ex)
{
Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
Thread.Sleep(2000);
isExit = true;
}
}

private static void ConnectionShutdown(object sender, ShutdownEventArgs e)
{
Console.WriteLine("Connection has already closed.");
}

/// <summary>
/// 等待接收指令
/// </summary>
private static void WaitCommand()
{
while (!isExit)
{
string line = Console.ReadLine().ToLower().Trim();
switch (line)
{
case "exit":
Close();
isExit = true;
break;
case "clear":
Console.Clear();
break;
default:
break;
}
}

Console.WriteLine("Goodbye!");
}

private static void Close()
{
if (_recvChannel != null && _recvChannel.IsOpen)
{
_recvChannel.Close();
}

if (_recvConn != null && _recvConn.IsOpen)
{
_recvConn.Close();
}

if (_senderConn != null && _senderConn.IsOpen)
{
_senderConn.Close();
}
}

#region 异步消息处理,客户端发送完消息后不再等待

/// <summary>
/// 消息接收处理事件,多线程处理消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void consumer_Received(object sender, BasicDeliverEventArgs e)
{
byte[] body = e.Body;
//bool isSuccess = false;

Task.Run(() => HandlingMessage(body, e));
}

/// <summary>
/// 消息处理
/// </summary>
/// <param name="msgModel"></param>
/// <param name="e"></param>
private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
{
bool isSuccess = false;
string message = Encoding.UTF8.GetString(body);
IModel _senderChannel = _senderConn.CreateModel(); //多线程中每个线程使用独立的信道

try
{
var msgModel = JsonConvert.DeserializeObject<MessageModel>(message);
if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理
{
throw new MessageException("消息解析失败");
}

var random = new Random();
int num = random.Next(0, 4);

//模拟处理失败
if (random.Next(0, 11) == 4)
{
throw new Exception("处理失败", null);
}

//模拟解析失败
if (random.Next(0, 11) == 8)
{
throw new MessageException("消息解析失败");
}

await Task.Delay(num*1000);

//这里简单处理,仅格式化输出消息内容
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " Used: " + num + "s MSG:" + msgModel);

isSuccess = true;
}
catch (MessageException msgEx)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message);
_recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发
return;
}
catch (Exception ex)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message);
}

if (isSuccess)
{
try
{
_senderChannel.BasicPublish("", "checkQueue", null, body); //发送消息到内容检查队列
_recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功
}
catch (AlreadyClosedException acEx)
{
Console.WriteLine("ERROR:连接已关闭");
}
}
else
{
_recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发
}

_senderChannel.Close();
}

#endregion

#region 同步消息处理(RPC)

#endregion
}


③第三个控制台应用程序

class Program
{
private static IConnection _recvConn;
private static IConnection _senderConn;
private static IModel _recvChannel;
private static bool isExit;

private static void Main(string[] args)
{
Setup();

Console.WriteLine("开始使用checkQueue队列里的消息(同时推送到reportQueue报告队列):");

WaitCommand();
}

/// <summary>
///     初始化
/// </summary>
private static void Setup()
{
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest",
//VirtualHost = "test",
TopologyRecoveryEnabled = true,
AutomaticRecoveryEnabled = true
};

try
{
_recvConn = factory.CreateConnection();
_recvConn.ConnectionShutdown += ConnectionShutdown;
_recvChannel = _recvConn.CreateModel();
_recvChannel.QueueDeclare("checkQueue", false, false, false, null);
_recvChannel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(_recvChannel);
consumer.Received += consumer_Received;
_recvChannel.BasicConsume("checkQueue", false, consumer);

_senderConn = factory.CreateConnection();
IModel channel = _senderConn.CreateModel();
channel.QueueDeclare("reportQueue", false, false, false, null);
//channel.Close();  //这里如果关闭channel的话,自动重连的时候无法恢复reportQueue队列,
//因为reportQueue是使用channel创建的,恢复的时候还要使用channel,必须保持该信道不关闭
}
catch (BrokerUnreachableException ex)
{
Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
Thread.Sleep(2000);
isExit = true;
}
}

private static void ConnectionShutdown(object sender, ShutdownEventArgs e)
{
Console.WriteLine("连接已关闭.");
}

/// <summary>
/// 等待接收指令
/// </summary>
private static void WaitCommand()
{
while (!isExit)
{
string line = Console.ReadLine().ToLower().Trim();
switch (line)
{
case "exit":
Close();
isExit = true;
break;
case "clear":
Console.Clear();
break;
default:
break;
}
}

Console.WriteLine("Goodbye!");
}

private static void Close()
{
if (_recvChannel != null && _recvChannel.IsOpen)
{
_recvChannel.Close();
}

if (_recvConn != null && _recvConn.IsOpen)
{
_recvConn.Close();
}

if (_senderConn != null && _senderConn.IsOpen)
{
_senderConn.Close();
}
}

#region 异步消息处理,客户端发送完消息后不再等待

/// <summary>
///  消息接收处理事件,多线程处理消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void consumer_Received(object sender, BasicDeliverEventArgs e)
{
byte[] body = e.Body;

Task.Run(() => HandlingMessage(body, e));
}

/// <summary>
///  消息处理
/// </summary>
/// <param name="msgModel"></param>
/// <param name="e"></param>
private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
{
bool isSuccess = false;
string message = Encoding.UTF8.GetString(body);
IModel _senderChannel = _senderConn.CreateModel(); //多线程中每个线程使用独立的信道

try
{
var msgModel = JsonConvert.DeserializeObject<MessageModel>(message);
if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理
{
throw new MessageException("消息解析失败");
}

var random = new Random();
int num = random.Next(0, 4);

//模拟处理失败
if (random.Next(0, 11) == 4)
{
throw new Exception("处理失败", null);
}

//模拟解析失败
if (random.Next(0, 11) == 8)
{
throw new MessageException("消息解析失败");
}

await Task.Delay(num*1000);

//这里简单处理,仅格式化输出消息内容
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" Used: " + num + "s MSG:" + msgModel);

isSuccess = true;
}
catch (MessageException msgEx)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + msgEx.Message + " MSG:" + message);
_recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发
return;
}
catch (Exception ex)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId + " ERROR:" + ex.Message + " MSG:" + message);
}

if (isSuccess)
{
try
{
_senderChannel.BasicPublish("", "reportQueue", null, body); //发送消息到内容检查队列
_recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功
}
catch (AlreadyClosedException acEx)
{
Console.WriteLine("ERROR:连接已关闭");
}
}
else
{
_recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发
}

_senderChannel.Close();
}

#endregion
}


④第四个控制台应用程序

class Program
{
private static IConnection _recvConn;
private static IModel _recvChannel;
private static bool isExit;
//private static IConnection _receiverConn; //同步处理(RPC)时使用

private static void Main(string[] args)
{
Setup();

Console.WriteLine("开始消费reportQueue队列里的消息:");

WaitCommand();
}

/// <summary>
///  初始化
/// </summary>
private static void Setup()
{
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "test",
Password = "test",
VirtualHost = "test",
TopologyRecoveryEnabled = true,
AutomaticRecoveryEnabled = true
};

try
{
_recvConn = factory.CreateConnection();
_recvConn.ConnectionShutdown += ConnectionShutdown;
_recvChannel = _recvConn.CreateModel();
_recvChannel.QueueDeclare("reportQueue", false, false, false, null);
_recvChannel.BasicQos(0, 10, false);
var consumer = new EventingBasicConsumer(_recvChannel);
consumer.Received += consumer_Received;
_recvChannel.BasicConsume("reportQueue", false, consumer);
}
catch (BrokerUnreachableException ex)
{
Console.WriteLine("ERROR: RabbitMQ服务器未启动!");
Thread.Sleep(2000);
isExit = true;
}
}

private static void ConnectionShutdown(object sender, ShutdownEventArgs e)
{
Console.WriteLine("连接已关闭.");
}

/// <summary>
///  等待接收指令
/// </summary>
private static void WaitCommand()
{
while (!isExit)
{
string line = Console.ReadLine().ToLower().Trim();
switch (line)
{
case "exit":
Close();
isExit = true;
break;
case "clear":
Console.Clear();
break;
default:
break;
}
}

Console.WriteLine("Goodbye!");
}

private static void Close()
{
if (_recvChannel != null && _recvChannel.IsOpen)
{
_recvChannel.Close();
}

if (_recvConn != null && _recvConn.IsOpen)
{
_recvConn.Close();
}
}

#region 异步消息处理,客户端发送完消息后不再等待

/// <summary>
/// 消息接收处理事件,多线程处理消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void consumer_Received(object sender, BasicDeliverEventArgs e)
{
byte[] body = e.Body;

Task.Run(() => HandlingMessage(body, e));
}

/// <summary>
///  消息处理
/// </summary>
/// <param name="msgModel"></param>
/// <param name="e"></param>
private static async void HandlingMessage(byte[] body, BasicDeliverEventArgs e)
{
bool isSuccess = false;
string message = Encoding.UTF8.GetString(body);

try
{
var msgModel = JsonConvert.DeserializeObject<MessageModel>(message);
if (msgModel == null || !msgModel.IsVlid()) //解析失败或消息格式不正确,拒绝处理
{
throw new MessageException("消息解析失败");
}

var random = new Random();
int num = random.Next(0, 4);

//模拟处理失败
if (random.Next(0, 11) == 4)
{
throw new Exception("处理失败", null);
}

//模拟解析失败
if (random.Next(0, 11) == 8)
{
throw new MessageException("消息解析失败");
}

await Task.Delay(num*1000);

//这里简单处理,仅格式化输出消息内容
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" Used: " + num + "s MSG:" + msgModel);

isSuccess = true;
}
catch (MessageException msgEx)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" ERROR:" + msgEx.Message + " MSG:" + message);
_recvChannel.BasicReject(e.DeliveryTag, false); //不再重新分发
return;
}
catch (Exception ex)
{
Console.WriteLine("Time:" + DateTime.Now + " ThreadID:" + Thread.CurrentThread.ManagedThreadId +
" ERROR:" + ex.Message + " MSG:" + message);
}

if (isSuccess)
{
try
{
_recvChannel.BasicAck(e.DeliveryTag, false); //确认处理成功
}
catch (AlreadyClosedException acEx)
{
Console.WriteLine("ERROR:连接已关闭");
}
}
else
{
_recvChannel.BasicReject(e.DeliveryTag, true); //处理失败,重新分发
}
}

#endregion
}


运行结果如图:







内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: