RabbitMQ之消息确认机制AMQP事务
2018-02-01 18:56
411 查看
此套免费课程视频,本人录播到了腾讯课堂
更多请关注腾讯课堂牧码人或 登录网站http://www.51mmr.net
https://ke.qq.com/course/288116#tuin=5740604a
那怎么办?
我们有两种方式:
1.通过AMQP协议的事务机制来实现消息的确认
2.confirm模式;
txSelect用于将当前channel设置成transaction模式,
txCommit用于提交事务,
txRollback用于回滚事务,
在通过txSelect开启事务之后,我们便可以发布消息给RabbitMQ服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
代码:
下面来举个例子,因为消息确认机制是对于生产者 ,我们这里只讨论生产者的代码
通过测试:此种模式还是很耗时的,因为内部走了多次通信,所以采用这种方式 降低了Rabbitmq的消息吞吐量
更多请关注腾讯课堂牧码人或 登录网站http://www.51mmr.net
https://ke.qq.com/course/288116#tuin=5740604a
概述
我们在RabbitMQ中可以通过持久化来解决服务器挂掉而丢失数据问题,但是大家有没有想过,我的消息到达了RabbitMQ服务器了吗??? 我们是不知道的,导致的问题就是 如果消息在到达服务器之前就丢失了,持久化也是不能解决问题的!那怎么办?
我们有两种方式:
1.通过AMQP协议的事务机制来实现消息的确认
2.confirm模式;
事务机制
RabbitMQ中提供了3个方法:txSelect(), txCommit()以及txRollback(),其类似于jdbc中的事务开启 提交 回滚;txSelect用于将当前channel设置成transaction模式,
txCommit用于提交事务,
txRollback用于回滚事务,
在通过txSelect开启事务之后,我们便可以发布消息给RabbitMQ服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
代码:
channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit();
下面来举个例子,因为消息确认机制是对于生产者 ,我们这里只讨论生产者的代码
生产者
public class SendMQ { private static final String QUEUE_NAME = "QUEUE_simple"; @Test public void sendMsg() throws IOException, TimeoutException { /* 获取一个连接 */ Connection connection = ConnectionUtils.getConnection(); /* 从连接中创建通道 */ Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "Hello Simple QUEUE !"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("----msg rollabck "); }finally{ System.out.println("---------send msg over:" + msg); } channel.close(); connection.close(); } }
消费者
public class Consumer { private static final String QUEUE_NAME = "QUEUE_simple"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { //获取到达的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); } }
通过测试:此种模式还是很耗时的,因为内部走了多次通信,所以采用这种方式 降低了Rabbitmq的消息吞吐量
相关文章推荐
- RabbitMQ之消息确认机制(事务+Confirm)
- RabbitMQ之消息确认机制(事务+Confirm)
- 最近发现系统rabbitmq丢消息比较严重,于是想了些方案来查找原因,给将消息发送方式添加确认机制。 我们在本地模拟了wms发送打标消息的场景. 1. 有事务 2. 先发点对点队列, 再发订
- RabbitMQ-消息确认机制(事务+confirm)
- spring rabbitmq 消息确认机制和事务支持
- RabbitMQ之消息确认机制(事务+Confirm)
- RabbitMQ之消息确认机制(事务+Confirm) - 朱小厮的博客 - CSDN博客
- RabbitMQ(三)消息确认机制(事务+Confirm)
- RabbitMQ 消息持久化、事务、Publisher的消息确认机制
- RabbitMQ 消息持久化、事务、Publisher的消息确认机制
- RabbitMQ---9、消息确认机制(事务+Confirm)
- RabbitMQ之消息确认机制(事务+Confirm)
- RabbitMQ集群和消息传递确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ - Publisher的消息确认机制
- RabbitMQ(九):Publisher的消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- 中间件系列十 RabbitMQ之消费者端的消息确认机制
- RabbitMQ消息队列(九):Publisher的消息确认机制
- RabbitMQ 消息轮询和消息确认机制