关于rabbitmq的一些坑以及疑问
2017-05-26 15:27
274 查看
在springboot中使用rabbitmq的时候遇到了一些状况,发现很多是由于对概念不清造成的,特此记录。(以下内容纯属个人见解,欢迎指正。)
1.确认机制
publisherConfirm
个人理解的确认机制其实是由两部分组成的,分别是rabbitmq-server对于生产者的确认,即publisherConfirm。该确认信息会在server收到生产者发送的消息以后就立即返回给server,与消息是否被消费者处理无关,只是server单纯的告诉生产者,我已经收到你的消息了。
ack
还有一个消息确认就是ack了,ack指的是消费者从server中取出消息,然后告诉server我把消息取出来了。严格来讲其实与消息是否处理成功也是没有关系的。ack有三种,分别是无ack,自动ack,手动ack。无ack不需要讲解,手动ack就是消费者取到消息后自动给server返回ack,不管消费者对消息的处理是否会出现异常,都会返回ack,个人猜测是在业务代码执行之后,server拿到ack以后,如果正常情况下就会把消息从server中删除。但是如果遇到了异常了就会无限重试该消息,不会被删除。这里我就有个疑问一,希望大神解答一下,问题是这样的,我设置自动ack,在代码中写了一个空指针异常用来测试异常,这个时候接到消息以后就会出现异常,然后消息被无限重试,在重试的过程中消息的状态一直是unacked,然后我手动停掉了消费者,这时候消息的状态变成了ready,这个ready是怎么变得呢?
难道是消费者还发了一个别的标志告诉server我收到了消息但是处理出现异常了? 如果发送的是ack不是应该直接就从队列中把消息删除了吗?不应该存在ready吧。
关于手动ack,没有什么难以理解的,就是自己手动控制发送ack的时机,手动ack如果异常是不会无限重试的。
以上观点是我自己的理解,起初由于理解不当,想以ack去做分布式事务的管理,理清之后豁然开朗。
2.rpc消息类型,分布式事务管理
关于分布式事务以前没有太多的考虑,最近整合mq的时候才想到了用mq去解决一下,通过上网发现确认也是一种方案。
其实简单理解就是发送的mq消息有去有回,不过去与会的消息不是一个队列。当我们接收到消息的时候再指定一个返回的队列就可以了,在生产者中监听该队列。上一段代码吧@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("26Test");
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
message.getMessageProperties().getCorrelationIdString();
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(UUID.randomUUID().toString())
.replyTo("return")
.build();
System.out.println(" getReplyRequest----"+props.getCorrelationId()+"==============" +new Date() );
channel.basicPublish("", "return", props, body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
这段代码实现的功能就是监听26Test这个队列,处理完毕该队列的消息以后想return队列发送消息,实现消息的有去有回。不过这种事务的处理方式也是有
弊端的,很明显的异步处理
3.无限重试怎么处理?
自动ack的时候可以设置不无限重试吗?
1.确认机制
publisherConfirm
个人理解的确认机制其实是由两部分组成的,分别是rabbitmq-server对于生产者的确认,即publisherConfirm。该确认信息会在server收到生产者发送的消息以后就立即返回给server,与消息是否被消费者处理无关,只是server单纯的告诉生产者,我已经收到你的消息了。
ack
还有一个消息确认就是ack了,ack指的是消费者从server中取出消息,然后告诉server我把消息取出来了。严格来讲其实与消息是否处理成功也是没有关系的。ack有三种,分别是无ack,自动ack,手动ack。无ack不需要讲解,手动ack就是消费者取到消息后自动给server返回ack,不管消费者对消息的处理是否会出现异常,都会返回ack,个人猜测是在业务代码执行之后,server拿到ack以后,如果正常情况下就会把消息从server中删除。但是如果遇到了异常了就会无限重试该消息,不会被删除。这里我就有个疑问一,希望大神解答一下,问题是这样的,我设置自动ack,在代码中写了一个空指针异常用来测试异常,这个时候接到消息以后就会出现异常,然后消息被无限重试,在重试的过程中消息的状态一直是unacked,然后我手动停掉了消费者,这时候消息的状态变成了ready,这个ready是怎么变得呢?
难道是消费者还发了一个别的标志告诉server我收到了消息但是处理出现异常了? 如果发送的是ack不是应该直接就从队列中把消息删除了吗?不应该存在ready吧。
关于手动ack,没有什么难以理解的,就是自己手动控制发送ack的时机,手动ack如果异常是不会无限重试的。
以上观点是我自己的理解,起初由于理解不当,想以ack去做分布式事务的管理,理清之后豁然开朗。
2.rpc消息类型,分布式事务管理
关于分布式事务以前没有太多的考虑,最近整合mq的时候才想到了用mq去解决一下,通过上网发现确认也是一种方案。
其实简单理解就是发送的mq消息有去有回,不过去与会的消息不是一个队列。当我们接收到消息的时候再指定一个返回的队列就可以了,在生产者中监听该队列。上一段代码吧@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("26Test");
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
message.getMessageProperties().getCorrelationIdString();
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.correlationId(UUID.randomUUID().toString())
.replyTo("return")
.build();
System.out.println(" getReplyRequest----"+props.getCorrelationId()+"==============" +new Date() );
channel.basicPublish("", "return", props, body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
});
return container;
}
这段代码实现的功能就是监听26Test这个队列,处理完毕该队列的消息以后想return队列发送消息,实现消息的有去有回。不过这种事务的处理方式也是有
弊端的,很明显的异步处理
3.无限重试怎么处理?
自动ack的时候可以设置不无限重试吗?
相关文章推荐
- Echarts的使用、封装以及关于Echarts2.x与Echarts3.x性能方面的一些疑问
- 关于“天轰穿”个人2007年的一些安排以及系列教程未来答疑
- 关于无线数据传输中产生的流量的一些计算方法以及问题!
- 关于32位Vista不支持4G内存,以及一些误传的解决方案。
- 关于人性的一些思考:如何提高员工工作热情与成就感,以及因材施教的心灵培训
- 【主题】关于做代码以及文档review的一些问题
- 关于使用存储过程的一些好处以及注意事项
- 关于windows环境下配置nginx的一些疑问
- 近期找的一些实用资料,关于Learning以及Kernel的
- C++Builder中关于INDY库的使用 以及一些问题的解决(E2238 Multiple declaration for 'fd_set')
- Linux 关于动态链接库以及静态链接库的一些概念
- 关于lcc以及lcc-win32的一些链接
- Linux 关于动态链接库以及静态链接库的一些概念
- [转] 关于硬盘修复以及低级格式化的一些文章
- 有必要澄清两个基本概念--算法和过程的关系以及关于程序设计方法论的一些看法
- 关于flex 3.0 制作mp3播放器的一些疑问
- 关于Access发布使用打包软件的一些疑问解答
- 第四章 控制执行流程的一些笔记以及关于char的一点东西
- 关于使用存储过程的一些好处以及注意事项[转]
- 收集一些关于视频文件格式以及编码计算的一些知识