RabbitMQ之Falldisc(持久化)
2017-01-07 15:38
162 查看
RabbitMQ的消息队列是驻留内存的, 万一机器Down机的话, 内存里的队列, 也会消失不见. 为解决这个问题, 才有了”持久化” 这个概念.
顾名思义: 将消息队列存到硬盘上(落盘)的文件中, 势必会影响效率
消息实体
生产者
消费者
重点在于
队列要持久化(第二个参数 true)
顾名思义: 将消息队列存到硬盘上(落盘)的文件中, 势必会影响效率
消息实体
package com.yuchen.demo.falldisc; import java.io.Serializable; public class Message implements Serializable { private static final long serialVersionUID = 1L; String xxx; public Message(String xxx) { this.xxx = xxx; } public String getXxx() { return xxx; } public void setXxx(String xxx) { this.xxx = xxx; } @Override public String toString() { return this.xxx; } }
生产者
package com.yuchen.demo.falldisc; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class Send { // 队列名称 private final static String QUEUE_NAME = "lsy_falldisc"; public static void main(String[] args) throws Exception { // 创建连接连接到MabbitMQ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.13.144.24"); factory.setVirtualHost("/"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); // 新建连接 Connection connection = factory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 创建队列 (队列名称, 持久化, 仅有1个consumer 与autoDelete配合使用 没有consumer自动删除 , 自动删除, 其他参数) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 发送消息 byte[] // 用户可以把自己的消息序列化成JSON等格式在转成byte[]发送到队列中取出消息后再反序列化得到消息内容 Message message = new Message("Hello Message !"); // 消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN ByteArrayOutputStream baos = new ByteArrayOutputStream(10000); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(message); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, baos.toByteArray()); System.out.println("发送成功: '" + message + "'"); // 关闭连接 channel.close(); connection.close(); } }
消费者
package com.yuchen.demo.falldisc; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MessageConsumer { // 队列名称 private final static String QUEUE_NAME = "message_falldisc"; public static void start() throws Exception { // 创建连接连接到MabbitMQ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.13.144.24"); factory.setVirtualHost("/"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); // 新建连接 Connection connection = factory.newConnection(); // 建立通道 Channel channel = connection.createChannel(); // 声明队列, 可能在发送方之前启动接收方, 确保队列存在。 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ByteArrayInputStream bais = new ByteArrayInputStream(body); ObjectInputStream ois = new ObjectInputStream(bais); Message message = null; try { message = (Message) ois.readObject(); } catch (ClassNotFoundException e) { e.printStackTrace(); } System.out.println("接收到远端消息: '" + message.toString() + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
重点在于
队列要持久化(第二个参数 true)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
消息也要持久化(第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN)
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, baos.toByteArray());
相关文章推荐
- rabbitmq 消息持久化之receive and send
- 删除rabbitmq中持久化的队列和数据
- 轻松搞定RabbitMQ(三)——消息应答(客户端挂)与消息持久化(服务器端挂)
- RabbitMQ持久化机制
- 轻松搞定RabbitMQ(三)——消息应答与消息持久化
- RabbitMQ之消息持久化(队列持久化、消息持久化)
- rabbitmq消息持久化,避免异常情况下,消息会丢失
- 轻松搞定RabbitMQ(三)——消息应答与消息持久化
- RabbitMQ系列三 (深入消息队列) 持久化
- RabbitMQ(二)队列与消息的持久化
- RabbitMQ原理与相关操作(三)消息持久化
- RabbitMQ-理解消息通信-持久化策略
- RabbitMQ持久化机制
- RabbitMQ 持久化
- RabbitMQ(二)队列与消息的持久化
- 轻松搞定RabbitMQ(三)——消息应答与消息持久化
- rabbitmq (二) 持久化
- RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)方案二
- RabbitMQ之消息持久化(队列持久化、消息持久化)
- RabbitMQ支持消息的持久化