您的位置:首页 > 其它

RabbitMQ 消息确认

2016-11-17 00:00 92 查看
摘要: RabbitMQ 消息确认

RabbitMQ的消息确认类型

RabbitMQ给生产者返回消息确认

消费端给RabbitMQ返回消息确认

消费端给RabbitMQ返回消息拒绝

RabbitMQ返回消息确认

生产者端将消息发送出去,消息到达RabbitMQ之后,会返回一个到达确认。

这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。

Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。

举例说明

@Bean(name = "smsRabbitTemplate")
public RabbitTemplate smsRabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setRoutingKey(smsRoutingKey);
rabbitTemplate.setQueue(QUEUE_SMS);
rabbitTemplate.setExchange("smsExchange");
rabbitTemplate.setConfirmCallback(smsConfirmCallBack());
rabbitTemplate.setRetryTemplate(retryTemplate());
return rabbitTemplate;
}

/**
* Email队列回调
* @author Xiaoyang.Li
*
*/
public class EmailConfirmCallBack implements ConfirmCallback{

private Logger logger = LoggerFactory.getLogger(EmailConfirmCallBack.class);

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
logger.info(correlationData.getId() + "--邮件已发送到RabbitMQ队列中.");
}else{
logger.info(correlationData.getId() + "--邮件发送到RabbitMQ队列失败。{}", cause);
}
}
}


消费端消息确认

默认的rabbitmq消费端是开启了自动确认的。

实际项目中往往我们会对消息进行一系列的处理,然后再给出消费确认,也就是我们需要关闭自动确认,使用手动确认,通过设置
AcknowledgeMode为MANUAL
可以开启手动确认。

Spring AMQP中我们可以通过设置
SimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL)来进行开启手动确认


举例说明

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setMessageConverter(new Jackson2JsonMessageConverter());
//如果设置手动消费,那么需要使用Channel.basicAck()进行数据返回,
//所以MessageListener需要实现ChannelAwareMessageListener接口
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置手动确认消息被消费
container.setQueues(mailQueue());//添加监听队列
container.setMessageListener(mailListenerUseChannel());//如果设置AcknowledgeMode为手动,那么需要使用这个

return container;
}


消费端消息拒绝

消费端如果多次消费失败,我们可以将这条消息拒绝,通过死信设置,rabbitmq会将拒绝的消息存放到死信队列中去。

消费端通过使用
basic.reject
来进行拒绝操作。

Spring AMQP中我们通过调用
channel.basicReject(deliveryTag, false)
来进行消费拒绝

举例说明

public class MailListenerUseChannel implements ChannelAwareMessageListener {

private Logger logger = LoggerFactory.getLogger(MailListenerUseChannel.class);

@Autowired
private MimeMailSender mimeMailSender;

private ByteArrayToObject byteArrayToObj;

@PostConstruct
public void init(){
byteArrayToObj = new ByteArrayToObject();
}

/**
* 从mq取出数据然后发送
*/
public void onMessage(Message message, Channel channel) {
boolean isComplete = true;
if(message == null){
logger.error("接收到的消息为空");
isComplete = false;
}
logger.info("消息格式"+message.getMessageProperties().getContentType());
EmailMessage emailMessage = byteArrayToObj.parseMessage(message, EmailMessage.class);
if(emailMessage.getHeader() == null){
logger.error("Email header is null. Please check message.");
isComplete = false;
}

if(isComplete){
logger.info("邮件类型>>>>>"+emailMessage.getHeader().getMsgType());

try{
//发送邮件
mimeMailSender.sendEmail(emailMessage);
//发送成功后返回响应给mq
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch(Exception e){
logger.error("sendEmail error.", e);
e.printStackTrace();
}
}else{
try {
//如果消息不合法,直接丢弃
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
logger.error("Discarded a message!");
} catch (IOException e) {
logger.error("Reject/Nack message error.");
e.printStackTrace();
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: