spring整合activemq发布订阅消息模式
2018-04-23 17:19
711 查看
maven引用
提供者配置文件(含订阅者配置文件)
provider
订阅者代码:
总结:
spring JmsTemplate 整合消息队列,不管是发布订阅或者点对点,其实程序中的生产者、消费者(订阅者)代码并无太大区别,区别的是程序配置。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.3.RELEASE</version> </dependency>
提供者配置文件(含订阅者配置文件)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.3.xsd"> <description>JMS配置</description> <!-- ActiveMQ 连接工厂 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.broker_url}" /> <property name="userName" value="${jms.userName}" /> <property name="password" value="${jms.password}" /> </bean> </property> <property name="maxConnections" value="100"></property> </bean> <!-- Spring Caching 连接工厂 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="jmsFactory" /> <property name="sessionCacheSize" value="10" /> </bean> <!-- 发布订阅模式配置 --> <bean id="jmsTemplatePubSub" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <!-- 区别它采用的模式为false是p2p为true是订阅 --> <property name="pubSubDomain" value="true" /> <!-- receiveTimeout表示接收消息时的超时时间 --> <property name="receiveTimeout" value="20000" /> </bean> <bean id="updateDecisionFile" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="update.decision.file" /> </bean> <!-- 消息监听(订阅者)配置 --> <bean id="updateDecisionFileContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <!-- 发布订阅模式 --> <property name="pubSubDomain" value="true"/> <property name="destination" ref="updateDecisionFile" /> <property name="messageListener" ref="updateDecisionFileMessageListener" /> <property name="concurrentConsumers" value="1" /> </bean> <bean class="org.springframework.jms.config.DefaultJmsListenerContainerFactory" id="jmsListenerContainerFactory"> <property name="connectionFactory" ref="cachingConnectionFactory" /> </bean> <jms:annotation-driven /> </beans>
provider
package com.cloud.cloudmanage.mq.produce; import java.io.Serializable; import javax.annotation.Resource; import javax.jms.Destination; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; @Component("updateDecisionFileProduce") public class UpdateDecisionFileProduce { @Autowired(required = false) @Resource(name = "jmsTemplatePubSub") private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Autowired(required = false) @Resource(name="updateDecisionFile") private Destination notifyQueue; public void send(final Serializable ob) { sendMessage(ob, notifyQueue); } /** * 使用jmsTemplate最简便的封装convertAndSend()发送Map类型的消息. */ private void sendMessage(Serializable msg, Destination destination) { jmsTemplate.convertAndSend(destination, msg); } }
订阅者代码:
package com.cloud.welkin.engine.mq.consumer; import javax.jms.Message; import javax.jms.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component("updateDecisionFileMessageListener") public class UpdateDecisionFileMessageListener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(UpdateDecisionFileMessageListener.class); @Override public void onMessage(Message message) { try { //todo 在这里写逻辑 } catch (Exception e) { logger.error("{}",e); } } }
总结:
spring JmsTemplate 整合消息队列,不管是发布订阅或者点对点,其实程序中的生产者、消费者(订阅者)代码并无太大区别,区别的是程序配置。
相关文章推荐
- Spring整合activemq发送消息(发布/订阅)
- spring整合ActiveMQ订阅模式(对象消息发送)
- Spring整合activeMq(二):发布订阅模式
- 消息中间件-activemq实战整合Spring之Topic模式(五)
- ActiveMQ 发布订阅(topic)模式_发送消息和接收消息
- Spring Redis与ActiveMQ发布订阅模式源码分析
- 利用Spring与ActiveMQ整合发送、接收消息实例(Queue与Topic模式)
- 使用ActiveMQ和Spring配置订阅发布模式
- spring boot整合activeMQ,实现ptp和topic两者消息模式
- springboot整合activemq,应答模式,消息重发机制,消息持久化
- Spring Data Redis实现消息队列——发布/订阅模式
- (三)ActiveMQ之发布- 订阅消息模式实现
- 使用Spring配置ActiveMQ的发布订阅模式
- spring整合activemq发送MQ消息[Topic模式]实例,activemqmq
- spring整合activemq发送MQ消息[queue模式]实例
- 分布式消息中间件(二)——ActiveMQ发布-订阅消息模式
- redis 消息队列发布订阅模式spring boot实现
- JMS消息队列ActiveMQ(发布/订阅模式)
- activeMQ 点对点以及发布与订阅 - 以及spring的整合&集群方式
- spring整合activemq发送MQ消息[Topic模式]实例