您的位置:首页 > 其它

关于rabbitmq的一些坑以及疑问

2017-05-26 15:27 274 查看
在springboot中使用rabbitmq的时候遇到了一些状况,发现很多是由于对概念不清造成的,特此记录。(以下内容纯属个人见解,欢迎指正。)

1.确认机制

  publisherConfirm


  个人理解的确认机制其实是由两部分组成的,分别是rabbitmq-server对于生产者的确认,即publisherConfirm。该确认信息会在server收到生产者发送的消息以后就立即返回给server,与消息是否被消费者处理无关,只是server单纯的告诉生产者,我已经收到你的消息了。

  ack

  还有一个消息确认就是ack了,ack指的是消费者从server中取出消息,然后告诉server我把消息取出来了。严格来讲其实与消息是否处理成功也是没有关系的。ack有三种,分别是无ack,自动ack,手动ack。无ack不需要讲解,手动ack就是消费者取到消息后自动给server返回ack,不管消费者对消息的处理是否会出现异常,都会返回ack,个人猜测是在业务代码执行之后,server拿到ack以后,如果正常情况下就会把消息从server中删除。但是如果遇到了异常了就会无限重试该消息,不会被删除。这里我就有个疑问一,希望大神解答一下,问题是这样的,我设置自动ack,在代码中写了一个空指针异常用来测试异常,这个时候接到消息以后就会出现异常,然后消息被无限重试,在重试的过程中消息的状态一直是unacked,然后我手动停掉了消费者,这时候消息的状态变成了ready,这个ready是怎么变得呢?
难道是消费者还发了一个别的标志告诉server我收到了消息但是处理出现异常了?  如果发送的是ack不是应该直接就从队列中把消息删除了吗?不应该存在ready吧。

关于手动ack,没有什么难以理解的,就是自己手动控制发送ack的时机,手动ack如果异常是不会无限重试的。

以上观点是我自己的理解,起初由于理解不当,想以ack去做分布式事务的管理,理清之后豁然开朗。

2.rpc消息类型,分布式事务管理

    关于分布式事务以前没有太多的考虑,最近整合mq的时候才想到了用mq去解决一下,通过上网发现确认也是一种方案。

其实简单理解就是发送的mq消息有去有回,不过去与会的消息不是一个队列。当我们接收到消息的时候再指定一个返回的队列就可以了,在生产者中监听该队列。上一段代码吧@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("26Test");
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
message.getMessageProperties().getCorrelationIdString();
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(UUID.randomUUID().toString())
.replyTo("return")
.build();
System.out.println(" getReplyRequest----"+props.getCorrelationId()+"==============" +new Date() );
channel.basicPublish("", "return", props, body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
这段代码实现的功能就是监听26Test这个队列,处理完毕该队列的消息以后想return队列发送消息,实现消息的有去有回。不过这种事务的处理方式也是有

弊端的,很明显的异步处理
3.无限重试怎么处理?

  自动ack的时候可以设置不无限重试吗?
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: