您的位置:首页 > 其它

RabbitMQ之消息确认机制AMQP事务

2018-02-01 18:56 411 查看
此套免费课程视频,本人录播到了腾讯课堂

更多请关注腾讯课堂牧码人或 登录网站http://www.51mmr.net

https://ke.qq.com/course/288116#tuin=5740604a

概述

我们在RabbitMQ中可以通过持久化来解决服务器挂掉而丢失数据问题,但是大家有没有想过,我的消息到达了RabbitMQ服务器了吗??? 我们是不知道的,导致的问题就是 如果消息在到达服务器之前就丢失了,持久化也是不能解决问题的!

那怎么办?

我们有两种方式:

1.通过AMQP协议的事务机制来实现消息的确认

2.confirm模式;

事务机制

RabbitMQ中提供了3个方法:txSelect(), txCommit()以及txRollback(),其类似于jdbc中的事务开启 提交 回滚;

txSelect用于将当前channel设置成transaction模式,

txCommit用于提交事务,

txRollback用于回滚事务,

在通过txSelect开启事务之后,我们便可以发布消息给RabbitMQ服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

代码:

channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();


下面来举个例子,因为消息确认机制是对于生产者 ,我们这里只讨论生产者的代码

生产者

public class SendMQ {
private static final String QUEUE_NAME  = "QUEUE_simple";

@Test
public void sendMsg() throws IOException, TimeoutException {
/* 获取一个连接 */
Connection connection = ConnectionUtils.getConnection();

/* 从连接中创建通道 */
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String msg = "Hello  Simple QUEUE !";

try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("----msg rollabck ");
}finally{
System.out.println("---------send msg over:" + msg);
}
channel.close();
connection.close();
}
}


消费者

public class Consumer {
private static final String QUEUE_NAME  = "QUEUE_simple";

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

DefaultConsumer consumer = new DefaultConsumer(channel) {
//获取到达的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}


通过测试:此种模式还是很耗时的,因为内部走了多次通信,所以采用这种方式 降低了Rabbitmq的消息吞吐量
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: