您的位置:首页 > 其它

Rabbitmq源码示例(生产者/消费者,非持久化/持久化)

2015-05-26 11:39 357 查看
//////////////// 非持久化生产者

static void TestThreadFun()

{

string queueName = "hello";

string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";

var body = Encoding.UTF8.GetBytes(message);

ConnectionFactory factory = new ConnectionFactory();

factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址

factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)

factory.Password = "123456"; // Rabbitmq服务账号的密码

bool durable = false; // 是否持久化

using (var connection = factory.CreateConnection())

{

using (var channel = connection.CreateModel())

{

// 声明一个队列,并且指定队列名称和属性

// queueName:队列名称

// durable:非 持久化

// false1:非 所有生产者断开连接之后,自动销毁消息队列

// false2:非 所有消费者断开连接之后,自动销毁消息队列

// null:

// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常

channel.QueueDeclare(queueName, durable, false, false, null);

for (int i = 0; i < nRequestCount; ++i)

{

// 发送一个消息

// "":交换机名称

// queueName:路由键

// null:

// body:发送数据

channel.BasicPublish("", queueName, null, body); // 消息为非持久化

}

}

}

}

//////////////// 持久化生产者

static void TestThreadFun()

{

string queueName = "hello";

string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";

var body = Encoding.UTF8.GetBytes(message);

ConnectionFactory factory = new ConnectionFactory();

factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址

factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)

factory.Password = "123456"; // Rabbitmq服务账号的密码

bool durable = true; // 是否持久化

using (var connection = factory.CreateConnection())

{

using (var channel = connection.CreateModel())

{

// 声明一个队列,并且指定队列名称和属性

// queueName:队列名称

// durable: 持久化

// false1: 非 所有生产者断开连接之后,自动销毁消息队列

// false2: 非 所有消费者断开连接之后,自动销毁消息队列

// null:

// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常

channel.QueueDeclare(queueName, durable, false, false, null);

// 创建一个属性对象,设置持久化为true

IBasicProperties properties = channel.CreateBasicProperties();

properties.SetPersistent(true);

for (int i = 0; i < nRequestCount; ++i)

{

// 发送一个消息

// "":交换机名称

// queueName:路由键

// properties:持久化为true的属性对象

// body:发送数据

channel.BasicPublish("", queueName, properties, body); // 消息为非持久化

}

}

}

}

//////////////// 非持久化消费者

static void Main(string[] args)

{

var factory = new ConnectionFactory();

factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址

factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)

factory.Password = "123456"; // Rabbitmq服务账号的密码

string queueName = "hello"; // 消息队列名字

bool durable = false; // 是否持久化

using (var connection = factory.CreateConnection())

{

using (var channel = connection.CreateModel())

{

// 声明一个队列,并且指定队列名称和属性

// queueName:队列名称

// durable:非 持久化

// false1:非 所有生产者断开连接之后,自动销毁消息队列

// false2:非 所有消费者断开连接之后,自动销毁消息队列

// null:

// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常

channel.QueueDeclare(queueName, durable, false, false, null);

// 创建一个消费者

var consumer = new QueueingBasicConsumer(channel);

// 启动一个消费者

// queueName:队列名称

// true:不需要回应。server将msg返回给消费者后,自动将本地消息删除。

该参数的使用与队列的持久化属性配合使用。非持久化队列,设置true;持久化队列,设置为false。

// consumer:消费者对象

channel.BasicConsume(queueName, true, consumer);

while (true)

{

BasicDeliverEventArgs ea = null;

// 尝试获取消息

// 1000:超时时间,ms

// ea:返回参数,函数成功时携带返回的消息

if (!consumer.Queue.Dequeue(1000, out ea))

{

break;

}

}

}

}

}

//////////////// 持久化消费者

static void Main(string[] args)

{

var factory = new ConnectionFactory();

factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址

factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)

factory.Password = "123456"; // Rabbitmq服务账号的密码

string queueName = "hello"; // 消息队列名字

bool durable = true; // 持久化

using (var connection = factory.CreateConnection())

{

using (var channel = connection.CreateModel())

{

// 声明一个队列,并且指定队列名称和属性

// queueName:队列名称

// durable:持久化

// false1:非 所有生产者断开连接之后,自动销毁消息队列

// false2:非 所有消费者断开连接之后,自动销毁消息队列

// null:

// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常

channel.QueueDeclare(queueName, durable, false, false, null);

// 创建一个消费者

var consumer = new QueueingBasicConsumer(channel);

// 启动一个消费者

// queueName:队列名称

// true:需要回应。server将msg返回给消费者后,需要等到消费者返回确认之后,才会将本地消息删除。

该参数的使用与队列的持久化属性配合使用。非持久化队列,设置true;持久化队列,设置为false。

// consumer:消费者对象

channel.BasicConsume(queueName, false, consumer);

while (true)

{

BasicDeliverEventArgs ea = null;

// 尝试获取消息

// 1000:超时时间,ms

// ea:返回参数,函数成功时携带返回的消息

if (!consumer.Queue.Dequeue(1000, out ea))

{

break;

}

// 向server发送确认

channel.BasicAck(ea.DeliveryTag, false);

}

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: