您的位置:首页 > 其它

RabbitMQ 消费者

2022-05-05 16:13 169 查看

原文连接:https://www.cnblogs.com/ysmc/p/16225142.html

项目需要引用 RabbitMQ.Client Nuget包

创建异步工厂

IAsyncConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = _rabbitMqOptions.HostName,
Port = _rabbitMqOptions.Port,
UserName = _rabbitMqOptions.UserName,
Password = _rabbitMqOptions.Password,
VirtualHost = _rabbitMqOptions.VirtualHost,
DispatchConsumersAsync = true
};

订阅

//创建连接
var connection = connectionFactory.CreateConnection();
//创建通道
var channel = connection.CreateModel();
channel.BasicQos(prefetchSize, prefetchCount, false);
//事件基本消费者
var consumer = new AsyncEventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += async (ch, ea) =>
{
try
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var requeue = await executeAsync(message);
if (requeue)
{
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
channel.BasicNack(ea.DeliveryTag, false, true);
}
}
catch (Exception)
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queueName, false, consumer);

单次或轮询

private async Task SingleOrPolling(Func<string, Task<bool>> executeAsync, bool isSingle, string? queueName = null)
{
var connectionFactory = _rabbitMQBaseService.GetAsyncConnectionFactory();
using var connection = connectionFactory.CreateConnection();
using var channel = connection.CreateModel();
queueName = GetQueueName(queueName);
while (true)
{
var response = channel.BasicGet(queueName, false);
try
{
if (null != response)
{
var message = Encoding.UTF8.GetString(response.Body.ToArray());
var requeue = await executeAsync(message);
if (requeue)
{
//确认该消息已被消费
channel.BasicAck(response.DeliveryTag, false);
}
else
{
channel.BasicNack(response.DeliveryTag, false, true);
}
}
}
catch (Exception)
{
channel.BasicAck(response.DeliveryTag, false);
}
if (isSingle)
{
break;
}
}
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: