您的位置:首页 > 编程语言 > Java开发

springboot activemq 2 持久化消息 与 持久化订阅

2017-10-23 01:34 441 查看
接着上一节http://blog.csdn.net/cons_step_by_step/article/details/78300427

改动1.减少springboot重复创建session的问题

jmsTemplate的地方加入了CachingConnectionFactory,这样配置可以

@Bean(name = "myJmsTemplate")
public JmsTemplate getJmsTemplate(ActiveMQConnectionFactory nonXaJmsConnectionFactory, MessageConverter jacksonJmsMessageConverter){
//使用CachingConnectionFactory可以提高部分性能。
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setSessionCacheSize(100);
cachingConnectionFactory.setTargetConnectionFactory(nonXaJmsConnectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
//设置deliveryMode(持久化), priority, timeToLive必须开启
jmsTemplate.setExplicitQosEnabled(true);
// 设置消息是否持久化
jmsTemplate.setDeliveryPersistent(true);
// 设置消息转换器
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter);
// 设置消息是否以事务
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}


改动2 : 加入订阅持久化

什么是订阅持久化?简单讲就是MQ记住了给每个订阅者发送到了哪里,如果中途发生故障又恢复后,可以从记忆的位置继续发送给订阅者。默认是非持久化订阅,即:订阅者在宕机重启之后,队列中有消息,订阅者也是收不到任何消息的,即便这些消息已经持久化在日志文件或者数据库中。

将原来默认的消息监听容器替换为如下两个配置Bean。这是两个分别的监听Bean,并且他们开启了持久化订阅参数setSubscriptionDurable( true ) ,并且拥有各自的ID,factory.setClientId(“10001”); factory.setClientId(“10002”);具体为什么要这么设置可以参看这篇博文https://www.tuicool.com/articles/UfimyuR

@Bean(name = "topicContainerFactory1")
public DefaultJmsListenerContainerFactory topicClient1(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer){
DefaultJmsListenerContainerFactory factory = defaultJmsListenerContainerFactoryTopic(connectionFactory,configurer);
factory.setClientId("10001");
return factory;
}

@Bean(name = "topicContainerFactory2")
public DefaultJmsListenerContainerFactory topicClient2(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer){
DefaultJmsListenerContainerFactory factory = defaultJmsListenerContainerFactoryTopic(connectionFactory,configurer);
factory.setClientId("10002");
return factory;
}

/**
*
* @param connectionFactory
* @param configurer
* @return
*/

public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory,connectionFactory);
factory.setPubSubDomain(true);
factory.setSessionTransacted(true);
factory.setAutoStartup(true);
//开启持久化订阅
factory.setSubscriptionDurable(true);
return factory;
}


改动3:对同一个队列的消息多添加了一个订阅者。

MQ可以在订阅者第一次订阅的时候记忆每个订阅者的发送位置,之后进行分别的推送。

@JmsListener(destination = MQ_TOPIC_NAME,containerFactory = "topicContainerFactory1") // 监听指定消息主题
public void receiveMessage1(Message message) throws Exception {
log.debug("[接收队列1] [收到消息] {}",message);
}

@JmsListener(destination = MQ_TOPIC_NAME,containerFactory = "topicContainerFactory2") // 监听指定消息主题
public void receiveMessage1(Message message) throws Exception {
log.debug("[接收队列2] [收到消息] {}",message);
}


测试结果:

1>同时开启发布者和两个订阅者,发布者向MQ推送消息。订阅者接收。

2017-10-23 01:09:33.258 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService  : [接收队列1] [收到消息] Message
2017-10-23 01:09:33.258 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2        : [接收队列2] [收到消息] Message
2017-10-23 01:09:33.354 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2        : [接收队列2] [收到消息] Message
2017-10-23 01:09:33.354 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService  : [接收队列1] [收到消息] Message
2017-10-23 01:09:33.407 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2        : [接收队列2] [收到消息] Message
2017-10-23 01:09:33.461 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService  : [接收队列1] [收到消息] Message


2>之后关闭消费者。发布者再发布3条消息。

3>此时可以将MQ服务关闭,模拟宕机。然后启动MQ服务,启动一个订阅者,紧接着会受到后续未发送的3条消息,然后再启动另外的订阅者,紧接着也会受到后续的三条消息。

MQ后续问题: MQ的集群,连接管理。后续用到的时候可以继续学习。集群基础学习地址。http://www.imooc.com/video/15223
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq springboot