RabbitMQ 消息确认
2016-11-17 00:00
92 查看
摘要: RabbitMQ 消息确认
消费端给RabbitMQ返回消息确认
消费端给RabbitMQ返回消息拒绝
这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。
Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。
实际项目中往往我们会对消息进行一系列的处理,然后再给出消费确认,也就是我们需要关闭自动确认,使用手动确认,通过设置
Spring AMQP中我们可以通过设置
消费端通过使用
Spring AMQP中我们通过调用
RabbitMQ的消息确认类型
RabbitMQ给生产者返回消息确认消费端给RabbitMQ返回消息确认
消费端给RabbitMQ返回消息拒绝
RabbitMQ返回消息确认
生产者端将消息发送出去,消息到达RabbitMQ之后,会返回一个到达确认。这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。
Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。
举例说明
@Bean(name = "smsRabbitTemplate") public RabbitTemplate smsRabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setRoutingKey(smsRoutingKey); rabbitTemplate.setQueue(QUEUE_SMS); rabbitTemplate.setExchange("smsExchange"); rabbitTemplate.setConfirmCallback(smsConfirmCallBack()); rabbitTemplate.setRetryTemplate(retryTemplate()); return rabbitTemplate; }
/** * Email队列回调 * @author Xiaoyang.Li * */ public class EmailConfirmCallBack implements ConfirmCallback{ private Logger logger = LoggerFactory.getLogger(EmailConfirmCallBack.class); @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ logger.info(correlationData.getId() + "--邮件已发送到RabbitMQ队列中."); }else{ logger.info(correlationData.getId() + "--邮件发送到RabbitMQ队列失败。{}", cause); } } }
消费端消息确认
默认的rabbitmq消费端是开启了自动确认的。实际项目中往往我们会对消息进行一系列的处理,然后再给出消费确认,也就是我们需要关闭自动确认,使用手动确认,通过设置
AcknowledgeMode为MANUAL可以开启手动确认。
Spring AMQP中我们可以通过设置
SimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL)来进行开启手动确认。
举例说明
@Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setMessageConverter(new Jackson2JsonMessageConverter()); //如果设置手动消费,那么需要使用Channel.basicAck()进行数据返回, //所以MessageListener需要实现ChannelAwareMessageListener接口 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置手动确认消息被消费 container.setQueues(mailQueue());//添加监听队列 container.setMessageListener(mailListenerUseChannel());//如果设置AcknowledgeMode为手动,那么需要使用这个 return container; }
消费端消息拒绝
消费端如果多次消费失败,我们可以将这条消息拒绝,通过死信设置,rabbitmq会将拒绝的消息存放到死信队列中去。消费端通过使用
basic.reject来进行拒绝操作。
Spring AMQP中我们通过调用
channel.basicReject(deliveryTag, false)来进行消费拒绝
举例说明
public class MailListenerUseChannel implements ChannelAwareMessageListener { private Logger logger = LoggerFactory.getLogger(MailListenerUseChannel.class); @Autowired private MimeMailSender mimeMailSender; private ByteArrayToObject byteArrayToObj; @PostConstruct public void init(){ byteArrayToObj = new ByteArrayToObject(); } /** * 从mq取出数据然后发送 */ public void onMessage(Message message, Channel channel) { boolean isComplete = true; if(message == null){ logger.error("接收到的消息为空"); isComplete = false; } logger.info("消息格式"+message.getMessageProperties().getContentType()); EmailMessage emailMessage = byteArrayToObj.parseMessage(message, EmailMessage.class); if(emailMessage.getHeader() == null){ logger.error("Email header is null. Please check message."); isComplete = false; } if(isComplete){ logger.info("邮件类型>>>>>"+emailMessage.getHeader().getMsgType()); try{ //发送邮件 mimeMailSender.sendEmail(emailMessage); //发送成功后返回响应给mq channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch(Exception e){ logger.error("sendEmail error.", e); e.printStackTrace(); } }else{ try { //如果消息不合法,直接丢弃 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); logger.error("Discarded a message!"); } catch (IOException e) { logger.error("Reject/Nack message error."); e.printStackTrace(); } } } }
相关文章推荐
- RabbitMQ 发送消息 确认消息 是否 发送成功
- RabbitMQ---9、消息确认机制(事务+Confirm)
- [置顶] 中间件系列二 RabbitMQ之消息持久性、确认机制、拒绝、预取数量、分配策略
- 中间件系列十 RabbitMQ之消费者端的消息确认机制
- (六)RabbitMQ消息队列-消息任务分发与消息ACK确认机制(PHP版)
- RabbitMQ消息队列(九):Publisher的消息确认机制
- spring rabbitmq 消息确认机制和事务支持
- RabbitMQ 消息轮询和消息确认机制
- 关于RabbitMQ的消息确认
- 【rabbitmq】rabbitmq概念解析--消息确认--示例程序
- 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订
- RabbitMQ消息队列(九):Publisher的消息确认机制
- 【RabbitMQ】6、rabbitmq生产者的消息确认
- RabbitMQ消息队列(九):Publisher的消息确认机制
- rabbitmq消息重新入队和消息确认
- RabbitMq 使用 | 第二篇:消息队列和确认
- RabbitMQ - Publisher的消息确认机制
- rabbitmq 消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- 使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)