RabbitMQ 消费者
2022-05-05 16:13
169 查看
原文连接:https://www.cnblogs.com/ysmc/p/16225142.html
项目需要引用 RabbitMQ.Client Nuget包
创建异步工厂
IAsyncConnectionFactory connectionFactory = new ConnectionFactory { HostName = _rabbitMqOptions.HostName, Port = _rabbitMqOptions.Port, UserName = _rabbitMqOptions.UserName, Password = _rabbitMqOptions.Password, VirtualHost = _rabbitMqOptions.VirtualHost, DispatchConsumersAsync = true };
订阅
//创建连接 var connection = connectionFactory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); channel.BasicQos(prefetchSize, prefetchCount, false); //事件基本消费者 var consumer = new AsyncEventingBasicConsumer(channel); //接收到消息事件 consumer.Received += async (ch, ea) => { try { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var requeue = await executeAsync(message); if (requeue) { //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); } else { channel.BasicNack(ea.DeliveryTag, false, true); } } catch (Exception) { channel.BasicAck(ea.DeliveryTag, false); } }; //启动消费者 设置为手动应答消息 channel.BasicConsume(queueName, false, consumer);
单次或轮询
private async Task SingleOrPolling(Func<string, Task<bool>> executeAsync, bool isSingle, string? queueName = null) { var connectionFactory = _rabbitMQBaseService.GetAsyncConnectionFactory(); using var connection = connectionFactory.CreateConnection(); using var channel = connection.CreateModel(); queueName = GetQueueName(queueName); while (true) { var response = channel.BasicGet(queueName, false); try { if (null != response) { var message = Encoding.UTF8.GetString(response.Body.ToArray()); var requeue = await executeAsync(message); if (requeue) { //确认该消息已被消费 channel.BasicAck(response.DeliveryTag, false); } else { channel.BasicNack(response.DeliveryTag, false, true); } } } catch (Exception) { channel.BasicAck(response.DeliveryTag, false); } if (isSingle) { break; } } }
相关文章推荐
- RabbitMQ(四):分发到多消费者
- rabbitMQ操作参数(一):生产者和消费者参数
- RabbitMQ简单的封装,通过配置文件生成生产消费者对象
- RabbitMQ入门之一 生产者、消费者、虚拟主机、信道、交换机、队列、绑定
- RabbitMq六种使用模式(2)_多个消费者
- rabbitMQ 生产者消费者
- RabbitMQ消息队列生产者和消费者
- 中间件系列十 RabbitMQ之消费者端的消息确认机制
- RabbitMQ 消息确认与公平调度消费者
- RabbitMQ下的生产消费者模式与订阅发布模式
- rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack
- RabbitMQ消费者无故中断问题{writer,send_failed,{error,timeout}}
- 用rabbitMQ实现生产者消费者
- RabbitMQ消息队列之二:消费者和生产者 Demo
- springboot2.x整合rabbitMQ:简单的生产者和消费者
- RabbitMQ(part2轮流发消息到不同消费者)----Work Queues
- (七)RabbitMQ之自定义消费者
- RabbitMQ的生产者与消费者
- 【RabbitMQ】生产者,消费者,信道,队列,交换器和绑定
- RabbitMQ发送端接收端生产者消费者实例