Rabbitmq源码示例(生产者/消费者,非持久化/持久化)
2015-05-26 11:39
357 查看
//////////////// 非持久化生产者
static void TestThreadFun()
{
string queueName = "hello";
string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";
var body = Encoding.UTF8.GetBytes(message);
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
bool durable = false; // 是否持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable:非 持久化
// false1:非 所有生产者断开连接之后,自动销毁消息队列
// false2:非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
for (int i = 0; i < nRequestCount; ++i)
{
// 发送一个消息
// "":交换机名称
// queueName:路由键
// null:
// body:发送数据
channel.BasicPublish("", queueName, null, body); // 消息为非持久化
}
}
}
}
//////////////// 持久化生产者
static void TestThreadFun()
{
string queueName = "hello";
string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";
var body = Encoding.UTF8.GetBytes(message);
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
bool durable = true; // 是否持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable: 持久化
// false1: 非 所有生产者断开连接之后,自动销毁消息队列
// false2: 非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
// 创建一个属性对象,设置持久化为true
IBasicProperties properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
for (int i = 0; i < nRequestCount; ++i)
{
// 发送一个消息
// "":交换机名称
// queueName:路由键
// properties:持久化为true的属性对象
// body:发送数据
channel.BasicPublish("", queueName, properties, body); // 消息为非持久化
}
}
}
}
//////////////// 非持久化消费者
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
string queueName = "hello"; // 消息队列名字
bool durable = false; // 是否持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable:非 持久化
// false1:非 所有生产者断开连接之后,自动销毁消息队列
// false2:非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
// 创建一个消费者
var consumer = new QueueingBasicConsumer(channel);
// 启动一个消费者
// queueName:队列名称
// true:不需要回应。server将msg返回给消费者后,自动将本地消息删除。
该参数的使用与队列的持久化属性配合使用。非持久化队列,设置true;持久化队列,设置为false。
// consumer:消费者对象
channel.BasicConsume(queueName, true, consumer);
while (true)
{
BasicDeliverEventArgs ea = null;
// 尝试获取消息
// 1000:超时时间,ms
// ea:返回参数,函数成功时携带返回的消息
if (!consumer.Queue.Dequeue(1000, out ea))
{
break;
}
}
}
}
}
//////////////// 持久化消费者
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
string queueName = "hello"; // 消息队列名字
bool durable = true; // 持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable:持久化
// false1:非 所有生产者断开连接之后,自动销毁消息队列
// false2:非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
// 创建一个消费者
var consumer = new QueueingBasicConsumer(channel);
// 启动一个消费者
// queueName:队列名称
// true:需要回应。server将msg返回给消费者后,需要等到消费者返回确认之后,才会将本地消息删除。
该参数的使用与队列的持久化属性配合使用。非持久化队列,设置true;持久化队列,设置为false。
// consumer:消费者对象
channel.BasicConsume(queueName, false, consumer);
while (true)
{
BasicDeliverEventArgs ea = null;
// 尝试获取消息
// 1000:超时时间,ms
// ea:返回参数,函数成功时携带返回的消息
if (!consumer.Queue.Dequeue(1000, out ea))
{
break;
}
// 向server发送确认
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
static void TestThreadFun()
{
string queueName = "hello";
string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";
var body = Encoding.UTF8.GetBytes(message);
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
bool durable = false; // 是否持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable:非 持久化
// false1:非 所有生产者断开连接之后,自动销毁消息队列
// false2:非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
for (int i = 0; i < nRequestCount; ++i)
{
// 发送一个消息
// "":交换机名称
// queueName:路由键
// null:
// body:发送数据
channel.BasicPublish("", queueName, null, body); // 消息为非持久化
}
}
}
}
//////////////// 持久化生产者
static void TestThreadFun()
{
string queueName = "hello";
string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";
var body = Encoding.UTF8.GetBytes(message);
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
bool durable = true; // 是否持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable: 持久化
// false1: 非 所有生产者断开连接之后,自动销毁消息队列
// false2: 非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
// 创建一个属性对象,设置持久化为true
IBasicProperties properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
for (int i = 0; i < nRequestCount; ++i)
{
// 发送一个消息
// "":交换机名称
// queueName:路由键
// properties:持久化为true的属性对象
// body:发送数据
channel.BasicPublish("", queueName, properties, body); // 消息为非持久化
}
}
}
}
//////////////// 非持久化消费者
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
string queueName = "hello"; // 消息队列名字
bool durable = false; // 是否持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable:非 持久化
// false1:非 所有生产者断开连接之后,自动销毁消息队列
// false2:非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
// 创建一个消费者
var consumer = new QueueingBasicConsumer(channel);
// 启动一个消费者
// queueName:队列名称
// true:不需要回应。server将msg返回给消费者后,自动将本地消息删除。
该参数的使用与队列的持久化属性配合使用。非持久化队列,设置true;持久化队列,设置为false。
// consumer:消费者对象
channel.BasicConsume(queueName, true, consumer);
while (true)
{
BasicDeliverEventArgs ea = null;
// 尝试获取消息
// 1000:超时时间,ms
// ea:返回参数,函数成功时携带返回的消息
if (!consumer.Queue.Dequeue(1000, out ea))
{
break;
}
}
}
}
}
//////////////// 持久化消费者
static void Main(string[] args)
{
var factory = new ConnectionFactory();
factory.HostName = "10.5.8.108";
// Rabbitmq服务器地址
factory.UserName = "vcyber"; // Rabbitmq服务中预先分配的账号(账号的权限决定后面的操作是否合法)
factory.Password = "123456"; // Rabbitmq服务账号的密码
string queueName = "hello"; // 消息队列名字
bool durable = true; // 持久化
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
// 声明一个队列,并且指定队列名称和属性
// queueName:队列名称
// durable:持久化
// false1:非 所有生产者断开连接之后,自动销毁消息队列
// false2:非 所有消费者断开连接之后,自动销毁消息队列
// null:
// 备注:如果同名的消息队列已经存在,且新指定的属性与已存在的不同,则抛出异常
channel.QueueDeclare(queueName, durable, false, false, null);
// 创建一个消费者
var consumer = new QueueingBasicConsumer(channel);
// 启动一个消费者
// queueName:队列名称
// true:需要回应。server将msg返回给消费者后,需要等到消费者返回确认之后,才会将本地消息删除。
该参数的使用与队列的持久化属性配合使用。非持久化队列,设置true;持久化队列,设置为false。
// consumer:消费者对象
channel.BasicConsume(queueName, false, consumer);
while (true)
{
BasicDeliverEventArgs ea = null;
// 尝试获取消息
// 1000:超时时间,ms
// ea:返回参数,函数成功时携带返回的消息
if (!consumer.Queue.Dequeue(1000, out ea))
{
break;
}
// 向server发送确认
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
相关文章推荐
- RabbitMQ示例生产者消费者(二)——Exchange fanout方式
- PHP基于rabbitmq操作类的生产者和消费者功能示例
- posix 条件变量与互斥锁 示例生产者--消费者问题 .
- 生产者消费者示例二
- php rabbitmq操作类及生产者和消费者实例代码
- linux网络编程之posix 线程(三):posix 匿名信号量与互斥锁 示例生产者--消费者问题
- RabbitMQ入门之一 生产者、消费者、虚拟主机、信道、交换机、队列、绑定
- RabbitMQ 实战(四)消费者 ack 以及 生产者 confirms
- 多线程-线程间通信-多生产者多消费者示例
- 生产者-消费者模式案例以及数据共享队列【BlockingQueue】源码分析
- 生产者消费者示例三
- python生产者/消费者示例
- C++ 单生产者多消费者多线程示例
- rabbitmq的安装和命令介绍及python程序模拟生产者和消费者
- java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4
- Linux组件封装(五)一个生产者消费者问题示例
- Linux组件封装(五)一个生产者消费者问题示例
- muduo源码分析:无界队列和有界队列(消费者-生产者)
- kafka生产者、消费者java示例
- php rabbitmq操作类及生产者和消费者实例代码