RabbitMQ之消息持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。
为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
queue的持久化
queue的持久化是通过durable=true来实现的。
一般程序中这么使用:
Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("queue.persistent.name", true, false, false, null);
- 1
- 2
- 3
关键的是第二个参数设置为true,即durable=true.
Channel类中queueDeclare的完整定义如下:
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
参数说明:queue:queue的名称
exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
queueDeclare相关的有4种方法,分别是:
Queue.DeclareOk queueDeclare() throws IOException;Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
消息的持久化
如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新什么之前被持久化的queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
如果要在重启后保持消息的持久化必须设置消息是持久化的标识。
设置消息的持久化:
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
- 1
首先看一下basicPublish的方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- 1
- 2
- 3
- 4
- 5
routingKey表示routingKey的名称
body代表发送的消息体
有关mandatory和immediate的详细解释可以参考:RabbitMQ之mandatory和immediate.
这里关键的是BasicProperties props这个参数了,这里看下BasicProperties的定义:
public BasicProperties( String contentType,//消息类型如:text/plain String contentEncoding,//编码 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//优先级 String correlationId, String replyTo,//反馈队列 String expiration,//expiration到期时间 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
上面的实现代码使用的是MessageProperties.PERSISTENT_TEXT_PLAIN,那么这个又是什么呢?
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
换一种实现方式:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.deliveryMode(2);AMQP.BasicProperties properties = builder.build();channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
- 1
- 2
- 3
- 4
exchange的持久化
上面阐述了队列的持久化和消息的持久化,如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。这里博主建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
进一步讨论
1.将queue,exchange, message等都设置了持久化之后就能保证100%保证数据不丢失了嚒?
答案是否定的。
首先,从consumer端来说,如果这时autoAck=true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失,这种情况也好处理,只需将autoAck设置为false(方法定义如下),然后在正确处理完消息之后进行手动ack(channel.basicAck).
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
- 1
2.消息什么时候刷到磁盘?
写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。
欲了解更多消息中间件的内容,可以关注:消息中间件收录集
参考资料
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
- RabbitMQ之消息发布订阅与信息持久化技术
- 轻松搞定RabbitMQ(三)——消息应答与消息持久化
- RabbitMQ之消息发布订阅与信息持久化技术
- RabbitMQ原理与相关操作(三)消息持久化
- 3、RabbitMQ之消息发布订阅与信息持久化技术
- 【转】RabbitMQ入门_13_消息持久化
- RabbitMQ(三)—消息应答与消息持久化
- 轻松搞定RabbitMQ(三)——消息应答(客户端挂)与消息持久化(服务器端挂)
- RabbitMQ之队列与消息持久化
- rabbitMQ消息应答和持久化
- RabbitMQ支持消息的持久化
- RabbitMQ(二)队列与消息的持久化
- RabbitMQ原理三--消息持久化
- RabbitMQ(三):消息持久化策略
- 轻松搞定RabbitMQ(三)——消息应答与消息持久化
- RabbitMQ之队列与消息持久化
- 6_rabbitmq消息应答与消息持久化
- 轻松搞定RabbitMQ(三)——消息应答与消息持久化
- RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案二
- RabbitMQ 消息持久化、事务、Publisher的消息确认机制