您的位置:首页 > 编程语言 > C#

C# RabbitMq (基于消费者事件实现)

2017-12-08 13:30 337 查看
源码源码源码源码

一:服务方法

public class RabbitEventService: BaseService
{
#region 构造函数
/// <summary>
/// 构造函数
/// </summary>
/// <param name="config"></param>
public RabbitEventService(RabbitMqConfigModel config)
: base(config)
{
}
#endregion

#region 方法
#region 接收消息
/// <summary>
/// 接收消息
/// </summary>
/// <param name="method"></param>
public void Receive(Func<string, bool> method)
{
try
{
using (var channel = _connection.CreateModel())
{
//申明队列
channel.QueueDeclare(RabbitConfig.QueueName, RabbitConfig.DurableQueue, false, false, null);

if (!string.IsNullOrEmpty(RabbitConfig.ExchangeName)) //若交换机不为空
{
//申明路由
channel.ExchangeDeclare(RabbitConfig.ExchangeName, RabbitConfig.ExchangeType.ToString(), RabbitConfig.DurableQueue);
//队列和交换机绑定
channel.QueueBind(RabbitConfig.QueueName, RabbitConfig.ExchangeName, RabbitConfig.RoutingKey);
}

//channel.BasicQos(0, 1, false);

var consumer = new EventingBasicConsumer(channel);

//注册接收事件,一旦创建连接就去拉取消息
consumer.Received += (model, ea) =>
{
//阻塞函数,获取队列中的消息
ProcessingResultsEnum processingResult = ProcessingResultsEnum.Retry;
try
{
var body = ea.Body;

string message = Encoding.UTF8.GetString(body);
method(message);
processingResult = ProcessingResultsEnum.Accept;
}
catch(Exception)
{
processingResult = ProcessingResultsEnum.Reject; //系统无法处理的错误
}
finally
{
switch (processingResult)
{
case ProcessingResultsEnum.Accept:
//回复确认处理成功
channel.BasicAck(ea.DeliveryTag,
false);//处理单挑信息
break;
case ProcessingResultsEnum.Retry:

b3c9
//发生错误了,但是还可以重新提交给队列重新分配
channel.BasicNack(ea.DeliveryTag, false, true);
break;
case ProcessingResultsEnum.Reject:
//发生严重错误,无法继续进行,这种情况应该写日志或者是发送消息通知管理员
channel.BasicNack(ea.DeliveryTag, false, false);
//写日志
break;
}
}
};
channel.BasicConsume(RabbitConfig.QueueName,
false,//和tcp协议的ack一样,为false则服务端必须在收到客户端的回执(ack)后才能删除本条消息
consumer);
System.Threading.Thread.Sleep(-1);

}
}
catch (Exception)
{
}
}
#endregion

#endregion
}


二:配置

public class ConfigHelper
{
public static RabbitEventService CreateDefaultInstance()
{
return new RabbitEventService(new RabbitMqConfigModel()
{
IP = "127.0.0.1",
UserName = "guest",
Password = "guest",
Port = 15672,
VirtualHost = "kevinHost",
DurableQueue = true,
QueueName = "hello",
ExchangeName = "hello_ex1",
ExchangeType = ExchangeTypeEnum.direct,
DurableMessage = false,
RoutingKey = "bug"
});
}
}


三:消息推送

static void Main(string[] args)
{
RabbitEventService mq = RabbitMq.Consumer.Config.ConfigHelper.CreateDefaultInstance();

string message = "Hello World!";//待发送的消息

string errMsg = "";
for (int i = 0; i < 10000000; i++)
{
string body = string.Format(" {0} Sent: {1}", i, message);
Thread.Sleep(10);

mq.Send<string>(body, ref errMsg);
Console.WriteLine(body);
}

Console.ReadLine();
}


四:消费端

static void Main(string[] args)
{
RabbitEventService mq = RabbitMq.Consumer.Config.ConfigHelper.CreateDefaultInstance();
mq.Receive(Write);
}
public static bool Write(string message)
{
Console.WriteLine("Received {0}", message);
return new Random().Next(0, 1) == 0 ? false : true;
}


源码源码源码源码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐