ActiveMQ - 持久化消息与持久主题订阅
2017-04-27 00:00
190 查看
#1 前言
在上一篇文章《ActiveMQ - 初体验,探讨JMS通信模型》中引出了两个话题:
在点对点通信模型中,消费者没有启动,消息生产者投递消息到队列queue中,这时MQ服务重启,最后再启动消息消费者,消费者能接收到MQ服务未重启之前由消息生产者发布的消息,这是为什么呢?
在消息发布/订阅通信模型中,消息订阅者由于某种原因挂了,这时消息发布者通过MQ服务向所订阅的topic发布消息,但消息订阅者重启后接收不到前面发布的消息,如果是重要消息,那不是会造成难以估计的后果吗,有没有解决方法?
为了解决这些问题,JMS标准制定了两个概念,一个是持久化消息(Persistent messages),另一个是持久订阅(durable subscribers)。
#2 持久化消息与非持久化消息
消息的持久化,顾名思义,就是MQ服务在接收到消息后,把消息存储在文件或者数据库,而不是存储在计算机物理内存中,这样即时消息的消费者或者订阅者挂了重启后,也有可能获取到之前没有接收到的消息。那么,怎样控制消息发布者的投递方式呢?
控制消息投递方式有两种,一种是针对整个producer的(可以理解为全局的),另一种是针对消息对象的(可以理解为局部的)。在默认情况下,消息的投递方式都是采用持久化投递的。下面看一下两种投递方式是怎么设置的。
对整个producer域设置投递方式
对单个消息message设置投递方式
先看下用于实验的代码。
** 消息生产者 **
** 消息消费者 **
还是和上一篇文章一样,为了更好的探讨持久消息,需要更换启动顺序。
** 启动顺序: **
(1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者
** 结论:不管是否是持久消息,消费者都能正常接收和处理。 **
(2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者
** 结论:持久消息可以被消费者正常接收。但是,非持久消息消费者没有接收到,这是因为非持久消息存储在内存,在MQ服务停止后,内存中的消息都会被清除掉,自然就不会再被接收到了。 **
(3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者
** 结论:所有的消息可以被消费者正常接收。**
#3 非持久订阅与持久订阅
前面留了这么一个关于发布/订阅模型的问题,就是在默认情况下消息订阅者是不能获取到离线消息的,不管这个消息是持久消息还是非持久消息,为了解决这个问题,activemq对消息订阅者划分为非持久订阅和持久订阅这两种状态。默认情况下,消息订阅者是非持久订阅的。要注意的是,持久订阅者和非持久订阅者针对的域Domain是Pub/Sub,而不是P2P模型。
非持久订阅的实现代码如下:
要建立持久订阅,必须满足以下条件:
为连接connection设置一个客户 ID,如果ID以前已经被占用了,将会抛出异常。
为订阅的主题指定一个订阅名称,连接ID和订阅名两者组合必须唯一,否则会抛异常
下面通过实验给大家一个更加深刻的理解。在消息发布者端发布两条消息,一条是持久消息,另一条是非持久消息。
消息发布:
消息订阅:
通过实验顺序的不同,不一样的小结论
(1)通过管理后台清空所有的订阅者 -> MQ启动 -> 消息发布 -> 启动持久订阅者
结论:消息仍然没有被订阅者接收到,这是因为订阅者在消息发布之前没有在MQ服务中注册,告诉MQ服务自己是订阅某个主题的消息的,你要在我不在线的时候帮我保存这个消息,直到我取出这个消息为止
(2)MQ启动 ->启动持久订阅者 -> 持久订阅者下线 -> 消息发布 -> 持久订阅者重新上线
结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到
(3)MQ启动 -> 启动持久订阅者 -> 持久订阅者下线 -> 持久消息和非持久同时发布 -> MQ下线后重启 -> 持久订阅者重新上线
结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到
在上一篇文章《ActiveMQ - 初体验,探讨JMS通信模型》中引出了两个话题:
在点对点通信模型中,消费者没有启动,消息生产者投递消息到队列queue中,这时MQ服务重启,最后再启动消息消费者,消费者能接收到MQ服务未重启之前由消息生产者发布的消息,这是为什么呢?
在消息发布/订阅通信模型中,消息订阅者由于某种原因挂了,这时消息发布者通过MQ服务向所订阅的topic发布消息,但消息订阅者重启后接收不到前面发布的消息,如果是重要消息,那不是会造成难以估计的后果吗,有没有解决方法?
为了解决这些问题,JMS标准制定了两个概念,一个是持久化消息(Persistent messages),另一个是持久订阅(durable subscribers)。
#2 持久化消息与非持久化消息
消息的持久化,顾名思义,就是MQ服务在接收到消息后,把消息存储在文件或者数据库,而不是存储在计算机物理内存中,这样即时消息的消费者或者订阅者挂了重启后,也有可能获取到之前没有接收到的消息。那么,怎样控制消息发布者的投递方式呢?
控制消息投递方式有两种,一种是针对整个producer的(可以理解为全局的),另一种是针对消息对象的(可以理解为局部的)。在默认情况下,消息的投递方式都是采用持久化投递的。下面看一下两种投递方式是怎么设置的。
对整个producer域设置投递方式
Queue queue = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); //发送一个持久信息 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置持久投递 producer.send(session.createTextMessage(persistent_message_global));
对单个消息message设置投递方式
Queue queue = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); //发送一个持久信息 TextMessage message = session.createTextMessage(persistent_message_single); message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); producer.send(message);
2.1 演示和结论
接下来的演示中,消息发送者将通过上面的两种投递方式,发送三条消息,分别是两条持久消息和一条非持久消息,改变一下producer、broker、consumer三者的启动顺序,看一下效果和总结结论。先看下用于实验的代码。
** 消息生产者 **
public class Producer { public static final String QUEUE_NAME = "test-deliverymode-queue"; public static void main(String[] args) { System.out.println("Producer started!"); //构建非持久消息和持久消息的内容 String non_persistent_message_global = "全局方式设置的非持久消息 : " + System.currentTimeMillis(); String persistent_message_global = "全局方式设置的持久消息 : " + System.currentTimeMillis(); String persistent_message_single = "针对单条消息设置的持久消息 : " + System.currentTimeMillis(); try { Connection connection = ActiveMQManager.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); //发送一个持久信息 producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(session.createTextMessage(persistent_message_global)); //发送一个非持久信息 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.send(session.createTextMessage(non_persistent_message_global)); //发送一个持久信息 TextMessage message = session.createTextMessage(persistent_message_single); message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); producer.send(message); System.out.println("成功发送消息:" + non_persistent_message_global); System.out.println("成功发送消息:" + persistent_message_global); System.out.println("成功发送消息:" + persistent_message_single); producer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Producer end!"); } }
** 消息消费者 **
public class Consumer { public static void main(String[] args) throws IOException { System.out.println("Consumer started!"); try { Connection connection = ActiveMQManager.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(Producer.QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Consumer 获取消息 ---->" + text); } catch (Exception e) { e.printStackTrace(); } } }); System.in.read(); consumer.close(); session.close(); connection.stop(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Consumer end!"); } }
还是和上一篇文章一样,为了更好的探讨持久消息,需要更换启动顺序。
** 启动顺序: **
(1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者
** 结论:不管是否是持久消息,消费者都能正常接收和处理。 **
(2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者
** 结论:持久消息可以被消费者正常接收。但是,非持久消息消费者没有接收到,这是因为非持久消息存储在内存,在MQ服务停止后,内存中的消息都会被清除掉,自然就不会再被接收到了。 **
(3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者
** 结论:所有的消息可以被消费者正常接收。**
#3 非持久订阅与持久订阅
前面留了这么一个关于发布/订阅模型的问题,就是在默认情况下消息订阅者是不能获取到离线消息的,不管这个消息是持久消息还是非持久消息,为了解决这个问题,activemq对消息订阅者划分为非持久订阅和持久订阅这两种状态。默认情况下,消息订阅者是非持久订阅的。要注意的是,持久订阅者和非持久订阅者针对的域Domain是Pub/Sub,而不是P2P模型。
3.1 非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和 activemq broker 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。非持久订阅的实现代码如下:
Connection connection = ActiveMQManager.createConnection(); connection.setClientID(CLIENT_ID); //持久化订阅要设置 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(Producer.TOPIC_NAME); MessageConsumer consumer = session.createDurableSubscriber(topic, "DurableTopicSubscriber"); ////持久化订阅要使用这种方式创建consumer //监听消息 Connection connection = ActiveMQManager.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(Producer.TOPIC_NAME); MessageConsumer consumer = session.createConsumer(topic); //监听消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("NonDurableTopicSubscriber 获取消息 ---->" + text); } catch (Exception e) { e.printStackTrace(); } } });
3.2 持久订阅
消息订阅者为持久订阅时,客户端向activemq broker注册一个自己身份的ID,当这个客户端处于离线时,broker会为这个ID 保存所有发送到主题的消息,当客户再次连接到broker时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。要建立持久订阅,必须满足以下条件:
为连接connection设置一个客户 ID,如果ID以前已经被占用了,将会抛出异常。
Connection connection = ActiveMQManager.createConnection(); connection.setClientID(CLIENT_ID); //持久化订阅要设置 connection.start();
为订阅的主题指定一个订阅名称,连接ID和订阅名两者组合必须唯一,否则会抛异常
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(Producer.TOPIC_NAME); MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME); //持久订阅需要以这种方式创建订阅者
下面通过实验给大家一个更加深刻的理解。在消息发布者端发布两条消息,一条是持久消息,另一条是非持久消息。
消息发布:
public class Producer { public static final String TOPIC_NAME = "test-durable-topic"; public static void main(String[] args) { System.out.println("Producer started!"); //构建非持久消息和持久消息的内容 String message_persistent = "持久的消息 : " + System.currentTimeMillis(); String message_non_persistent = "非持久的消息 : " + System.currentTimeMillis(); try { Connection connection = ActiveMQManager.createConnection(); connection.start(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Topic topic = session.createTopic(TOPIC_NAME); MessageProducer producer = session.createProducer(topic); //发送一个非持久信息 TextMessage non_persistent_message = session.createTextMessage(message_non_persistent); non_persistent_message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.send(non_persistent_message); //发送一个持久信息 TextMessage persistent_message = session.createTextMessage(message_persistent); persistent_message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); producer.send(persistent_message); System.out.println("成功发送消息:" + message_non_persistent); System.out.println("成功发送消息:" + message_persistent); producer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println("Producer end!"); } }
消息订阅:
/**
* 持久订阅
*/
public class DurableTopicSubscriber {
public static final String CLIENT_ID = "client_id";
public static final String SUBSCRIBER_NAME = "subscriber_name";
public static void main(String[] args) throws IOException {
System.out.println("DurableTopicSubscriber started!");
try {
Connection connection = ActiveMQManager.createConnection(); connection.setClientID(CLIENT_ID); //持久化订阅要设置 connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
//订阅者要使用这种方式创建
MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME);
//监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("DurableTopicSubscriber 获取消息 ---->" + text);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 线程一直等待
System.in.read();
consumer.close();
session.close();
connection.stop();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("DurableTopicSubscriber end!");
}
}
通过实验顺序的不同,不一样的小结论
(1)通过管理后台清空所有的订阅者 -> MQ启动 -> 消息发布 -> 启动持久订阅者
结论:消息仍然没有被订阅者接收到,这是因为订阅者在消息发布之前没有在MQ服务中注册,告诉MQ服务自己是订阅某个主题的消息的,你要在我不在线的时候帮我保存这个消息,直到我取出这个消息为止
(2)MQ启动 ->启动持久订阅者 -> 持久订阅者下线 -> 消息发布 -> 持久订阅者重新上线
结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到
(3)MQ启动 -> 启动持久订阅者 -> 持久订阅者下线 -> 持久消息和非持久同时发布 -> MQ下线后重启 -> 持久订阅者重新上线
结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到
代码
http://git.oschina.net/thinwonton/activemq-showcase相关文章推荐
- activeMQ消息详解(续) 订阅(主题)消息(消息持久化)
- ActiveMQ和spring整合,订阅主题和消息消费
- JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中
- ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系
- JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系
- Apache ActiveMQ教程二 (消息主题订阅)
- JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- ActiveMQ使用spring JmsTemplate生成和订阅消息(二)
- 【ActiveMQ教程】发布/订阅(Publish/Subscribe)消息教程
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- 消息中间件(3)-ActiveMQ消息持久化
- ActiveMQ使用笔记(二)ActiveMQ消息持久化一
- JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中
- 我的mqtt协议和emqttd开源项目个人理解(2) - 订阅$SYS主题,捕获客户端上下线消息
- ActiveMQ使用spring JmsTemplate生成和订阅消息(二)
- ActiveMQ之消息持久化方式
- ActiveMQ的设置消息时长,事务,确认机制 ,持久化(六)
- ActiveMQ的消息持久化到Mysql数据库
- Redis的高级应用-事务处理、持久化、发布与订阅消息、虚拟内存使用