您的位置:首页 > 其它

ActiveMQ - 持久化消息与持久主题订阅

2017-04-27 00:00 190 查看
#1 前言
在上一篇文章《ActiveMQ - 初体验,探讨JMS通信模型》中引出了两个话题:

在点对点通信模型中,消费者没有启动,消息生产者投递消息到队列queue中,这时MQ服务重启,最后再启动消息消费者,消费者能接收到MQ服务未重启之前由消息生产者发布的消息,这是为什么呢?

在消息发布/订阅通信模型中,消息订阅者由于某种原因挂了,这时消息发布者通过MQ服务向所订阅的topic发布消息,但消息订阅者重启后接收不到前面发布的消息,如果是重要消息,那不是会造成难以估计的后果吗,有没有解决方法?

为了解决这些问题,JMS标准制定了两个概念,一个是持久化消息(Persistent messages),另一个是持久订阅(durable subscribers)。

#2 持久化消息与非持久化消息

消息的持久化,顾名思义,就是MQ服务在接收到消息后,把消息存储在文件或者数据库,而不是存储在计算机物理内存中,这样即时消息的消费者或者订阅者挂了重启后,也有可能获取到之前没有接收到的消息。那么,怎样控制消息发布者的投递方式呢?

控制消息投递方式有两种,一种是针对整个producer的(可以理解为全局的),另一种是针对消息对象的(可以理解为局部的)。在默认情况下,消息的投递方式都是采用持久化投递的。下面看一下两种投递方式是怎么设置的。

对整个producer域设置投递方式

Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
//发送一个持久信息
producer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置持久投递
producer.send(session.createTextMessage(persistent_message_global));


对单个消息message设置投递方式

Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
//发送一个持久信息
TextMessage message = session.createTextMessage(persistent_message_single);
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(message);

2.1 演示和结论

接下来的演示中,消息发送者将通过上面的两种投递方式,发送三条消息,分别是两条持久消息和一条非持久消息,改变一下producer、broker、consumer三者的启动顺序,看一下效果和总结结论。

先看下用于实验的代码。

** 消息生产者 **

public class Producer {
public static final String QUEUE_NAME = "test-deliverymode-queue";

public static void main(String[] args) {
System.out.println("Producer started!");

//构建非持久消息和持久消息的内容
String non_persistent_message_global = "全局方式设置的非持久消息 : " + System.currentTimeMillis();
String persistent_message_global = "全局方式设置的持久消息 : " + System.currentTimeMillis();
String persistent_message_single = "针对单条消息设置的持久消息 : " + System.currentTimeMillis();

try {
Connection connection = ActiveMQManager.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);

//发送一个持久信息
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(session.createTextMessage(persistent_message_global));

//发送一个非持久信息
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage(non_persistent_message_global));

//发送一个持久信息
TextMessage message = session.createTextMessage(persistent_message_single);
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(message);

System.out.println("成功发送消息:" + non_persistent_message_global);
System.out.println("成功发送消息:" + persistent_message_global);
System.out.println("成功发送消息:" + persistent_message_single);

producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("Producer end!");
}
}

** 消息消费者 **

public class Consumer {
public static void main(String[] args) throws IOException {
System.out.println("Consumer started!");

try {
Connection connection = ActiveMQManager.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(Producer.QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Consumer 获取消息 ---->" + text);
} catch (Exception e) {
e.printStackTrace();
}
}
});

System.in.read();

consumer.close();
session.close();
connection.stop();

} catch (Exception e) {
e.printStackTrace();
}

System.out.println("Consumer end!");
}
}

还是和上一篇文章一样,为了更好的探讨持久消息,需要更换启动顺序。

** 启动顺序: **

(1)顺序:启动MQ broker服务 -> 启动消息发布者 -> 启动消息消费者

** 结论:不管是否是持久消息,消费者都能正常接收和处理。 **

(2)顺序:启动MQ broker服务 -> 启动消息发布者 -> 停止MQ broker服务 -> 启动MQ broker服务 -> 启动消息消费者

** 结论:持久消息可以被消费者正常接收。但是,非持久消息消费者没有接收到,这是因为非持久消息存储在内存,在MQ服务停止后,内存中的消息都会被清除掉,自然就不会再被接收到了。 **

(3)顺序:启动MQ broker服务 -> 启动消息消费者 -> 启动消息发布者

** 结论:所有的消息可以被消费者正常接收。**

#3 非持久订阅与持久订阅
前面留了这么一个关于发布/订阅模型的问题,就是在默认情况下消息订阅者是不能获取到离线消息的,不管这个消息是持久消息还是非持久消息,为了解决这个问题,activemq对消息订阅者划分为非持久订阅和持久订阅这两种状态。默认情况下,消息订阅者是非持久订阅的。要注意的是,持久订阅者和非持久订阅者针对的域Domain是Pub/Sub,而不是P2P模型。

3.1 非持久订阅

非持久订阅只有当客户端处于激活状态,也就是和 activemq broker 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。

非持久订阅的实现代码如下:

Connection connection = ActiveMQManager.createConnection();
connection.setClientID(CLIENT_ID); //持久化订阅要设置
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurableTopicSubscriber"); ////持久化订阅要使用这种方式创建consumer

//监听消息
Connection connection = ActiveMQManager.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);

//监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("NonDurableTopicSubscriber 获取消息 ---->" + text);
} catch (Exception e) {
e.printStackTrace();
}

}
});

3.2 持久订阅

消息订阅者为持久订阅时,客户端向activemq broker注册一个自己身份的ID,当这个客户端处于离线时,broker会为这个ID 保存所有发送到主题的消息,当客户再次连接到broker时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。

要建立持久订阅,必须满足以下条件:

为连接connection设置一个客户 ID,如果ID以前已经被占用了,将会抛出异常。

Connection connection = ActiveMQManager.createConnection();
connection.setClientID(CLIENT_ID); //持久化订阅要设置
connection.start();


为订阅的主题指定一个订阅名称,连接ID和订阅名两者组合必须唯一,否则会抛异常

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME); //持久订阅需要以这种方式创建订阅者

下面通过实验给大家一个更加深刻的理解。在消息发布者端发布两条消息,一条是持久消息,另一条是非持久消息。

消息发布:

public class Producer {
public static final String TOPIC_NAME = "test-durable-topic";

public static void main(String[] args) {
System.out.println("Producer started!");

//构建非持久消息和持久消息的内容
String message_persistent = "持久的消息 : " + System.currentTimeMillis();
String message_non_persistent = "非持久的消息 : " + System.currentTimeMillis();

try {
Connection connection = ActiveMQManager.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);

//发送一个非持久信息
TextMessage non_persistent_message = session.createTextMessage(message_non_persistent);
non_persistent_message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(non_persistent_message);

//发送一个持久信息
TextMessage persistent_message = session.createTextMessage(message_persistent);
persistent_message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
producer.send(persistent_message);

System.out.println("成功发送消息:" + message_non_persistent);
System.out.println("成功发送消息:" + message_persistent);

producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("Producer end!");
}
}

消息订阅:

/**
* 持久订阅
*/
public class DurableTopicSubscriber {

public static final String CLIENT_ID = "client_id";
public static final String SUBSCRIBER_NAME = "subscriber_name";

public static void main(String[] args) throws IOException {
System.out.println("DurableTopicSubscriber started!");

try {
Connection connection = ActiveMQManager.createConnection(); connection.setClientID(CLIENT_ID); //持久化订阅要设置 connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(Producer.TOPIC_NAME);
//订阅者要使用这种方式创建
MessageConsumer consumer = session.createDurableSubscriber(topic, SUBSCRIBER_NAME);

//监听消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("DurableTopicSubscriber 获取消息 ---->" + text);
} catch (Exception e) {
e.printStackTrace();
}

}
});

// 线程一直等待
System.in.read();

consumer.close();
session.close();
connection.stop();

} catch (Exception e) {
e.printStackTrace();
}

System.out.println("DurableTopicSubscriber end!");
}
}

通过实验顺序的不同,不一样的小结论

(1)通过管理后台清空所有的订阅者 -> MQ启动 -> 消息发布 -> 启动持久订阅者

结论:消息仍然没有被订阅者接收到,这是因为订阅者在消息发布之前没有在MQ服务中注册,告诉MQ服务自己是订阅某个主题的消息的,你要在我不在线的时候帮我保存这个消息,直到我取出这个消息为止

(2)MQ启动 ->启动持久订阅者 -> 持久订阅者下线 -> 消息发布 -> 持久订阅者重新上线

结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到

(3)MQ启动 -> 启动持久订阅者 -> 持久订阅者下线 -> 持久消息和非持久同时发布 -> MQ下线后重启 -> 持久订阅者重新上线

结论:不管是非持久消息还是持久消息都可以被持久订阅者接收到

代码

http://git.oschina.net/thinwonton/activemq-showcase
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息