您的位置:首页 > 编程语言 > C#

在C#中使用消息队列RabbitMQ

2014-10-27 14:41 741 查看
1、什么是RabbitMQ。详见 http://www.rabbitmq.com/

作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力。

2、安装

RabbitMQ服务:http://www.rabbitmq.com/download.html
(安装完RabbitMQ服务后,会在Windows服务中看到。如果没有Erlang运行环境,在安装过程中会提醒先安装Erlang环境。)

.net客户端类库:http://www.rabbitmq.com/dotnet.html

3、插件

RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,启动此插件。

CMD中运行命令:rabbitmq-plugins enable rabbitmq_management

注:rabbitmq-plugins 所在路径为:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

web管理工具的地址是:http://localhost:15672,初始用户名:guest 初始密码:guest

4、配置

配置文件地址为:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默认没有rabbit.config文件,需要手工新建(默认会有rabbitmq.config.example 作为参考)。基于安全,做了两个配置,如下:

[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 1234},
{"10.121.1.48", 8009}]}

]}
].


loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。

tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。

设置完后,别忘记了以下操作,否则配置不起作用。

停止RabbitMQ服务;

重新安装服务使配置生效:rabbitmq-service.bat install

此命令要切换到路径:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

启动RabbitMQ服务;

5、Demo练习。

消息生产者:

class Program
{
static void Main(string[] args)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.MqUserName;
factory.Password = Constants.MqPwd;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定义一个持久化队列,如果名称相同不会重复创建
channel.QueueDeclare("MyFirstQueue", true, false, false, null);
while (true)
{
string customStr = Console.ReadLine();
RequestMsg requestMsg = new RequestMsg();
requestMsg.Name = string.Format("Name_{0}", customStr);
requestMsg.Code = string.Format("Code_{0}", customStr);
string jsonStr = JsonConvert.SerializeObject(requestMsg);
byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);

//设置消息持久化
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "MyFirstQueue", properties, bytes);

//channel.BasicPublish("", "MyFirstQueue", null, bytes);

Console.WriteLine("消息已发送:" + requestMsg.ToString());
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();
}
}


class Program
{
static void Main(string[] args)
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = Constants.MqHost;
factory.Port = Constants.MqPort;
factory.UserName = Constants.MqUserName;
factory.Password = Constants.MqPwd;
using (IConnection conn = factory.CreateConnection())
{
using (IModel channel = conn.CreateModel())
{
//在MQ上定义一个持久化队列,如果名称相同不会重复创建
channel.QueueDeclare("MyFirstQueue", true, false, false, null);

//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
channel.BasicQos(0, 1, false);

Console.WriteLine("Listening...");

//在队列上定义一个消费者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
//消费队列,并设置应答模式为程序主动应答
channel.BasicConsume("MyFirstQueue", false, consumer);

while (true)
{
//阻塞函数,获取队列中的消息
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
string str = Encoding.UTF8.GetString(bytes);
RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
Console.WriteLine("HandleMsg:" + msg.ToString());
//回复确认
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
catch (Exception e1)
{
Console.WriteLine(e1.ToString());
}
Console.ReadLine();
}
}




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: