您的位置:首页 > 其它

RabbitMQ之Falldisc(持久化)

2017-01-07 15:38 162 查看
RabbitMQ的消息队列是驻留内存的, 万一机器Down机的话, 内存里的队列, 也会消失不见. 为解决这个问题, 才有了”持久化” 这个概念.

顾名思义: 将消息队列存到硬盘上(落盘)的文件中, 势必会影响效率

消息实体

package com.yuchen.demo.falldisc;

import java.io.Serializable;

public class Message implements Serializable {

private static final long serialVersionUID = 1L;
String xxx;

public Message(String xxx) {
this.xxx = xxx;
}

public String getXxx() {
return xxx;
}

public void setXxx(String xxx) {
this.xxx = xxx;
}

@Override
public String toString() {
return this.xxx;
}

}


生产者

package com.yuchen.demo.falldisc;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Send {

// 队列名称
private final static String QUEUE_NAME = "lsy_falldisc";

public static void main(String[] args) throws Exception {

// 创建连接连接到MabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.13.144.24");
factory.setVirtualHost("/");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
// 新建连接
Connection connection = factory.newConnection();
// 建立通道
Channel channel = connection.createChannel();

// 创建队列 (队列名称, 持久化, 仅有1个consumer 与autoDelete配合使用 没有consumer自动删除 , 自动删除, 其他参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 发送消息 byte[]
// 用户可以把自己的消息序列化成JSON等格式在转成byte[]发送到队列中取出消息后再反序列化得到消息内容
Message message = new Message("Hello Message !");
// 消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN
ByteArrayOutputStream baos = new ByteArrayOutputStream(10000);
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(message);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, baos.toByteArray());
System.out.println("发送成功: '" + message + "'");

// 关闭连接
channel.close();
connection.close();

}
}


消费者

package com.yuchen.demo.falldisc;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MessageConsumer {

// 队列名称
private final static String QUEUE_NAME = "message_falldisc";

public static void start() throws Exception {

// 创建连接连接到MabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.13.144.24");
factory.setVirtualHost("/");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
// 新建连接
Connection connection = factory.newConnection();
// 建立通道
Channel channel = connection.createChannel();

// 声明队列, 可能在发送方之前启动接收方, 确保队列存在。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(body);
ObjectInputStream ois = new ObjectInputStream(bais);
Message message = null;
try {
message = (Message) ois.readObject();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
System.out.println("接收到远端消息: '" + message.toString() + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}

}


重点在于

队列要持久化(第二个参数 true)

channel.queueDeclare(QUEUE_NAME, true, false, false, null);


消息也要持久化(第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN)


channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, baos.toByteArray());
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: