Spring整合ActiveMQ完成消息队列MQ编程
2017-12-21 21:13
489 查看
<–start–>
第一步:新建一个maven,将工程命名为activeMQ_spring。在pom.xml文件中导入相关jar包。
①spring开发和测试相关的jar包:
spring-context包
spring-test包
juint包
②ActiveMQ的jar包
activemq-all包
③spring整合activeMQ的jar包
完整的pom.xml文件代码如下:
第二步:编写配置生产者:
①配置activeMQ连接工厂。
②配置spring mq 管理工厂
③配置jmsTemplate模板
完整的applicationContext-mq.xml文件代码:
创建一个QueueSender类,生产queue队列信息:
生成topic话题信息:
1098e
测试代码:
编写消费者代码:
在applicationContext-mq-customer.xml中配置只扫描customer包:
配置Listener监听器,在applicationContext-mq-customer.xml中进行配置。
完整的applicationContext-mq-customer.xml代码:
编写消息消费者QueueCustomer1类,实现MessageListener接口:
创建另一个消息消费者QueueCustomer2类,实现MessageListener接口。
创建TopicCustomer1类,实现MessageListener接口。
创建TopicCustomer2类,实现MessageListener接口。
代码测试:
<–end–>
第一步:新建一个maven,将工程命名为activeMQ_spring。在pom.xml文件中导入相关jar包。
①spring开发和测试相关的jar包:
spring-context包
spring-test包
juint包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
②ActiveMQ的jar包
activemq-all包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.0</version> </dependency>
③spring整合activeMQ的jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.7.RELEASE</version> </dependency>
完整的pom.xml文件代码如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>cn.niwotaxuexiba.maven</groupId>
<artifactId>activeMQ_spring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activeMQ_spring</name>
<dependencies>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.0</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.7.RELEASE</version> </dependency>
</dependencies>
</project>
第二步:编写配置生产者:
①配置activeMQ连接工厂。
<!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
②配置spring mq 管理工厂
<!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean>
③配置jmsTemplate模板
<!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean> <!--Spring JmsTemplate 的消息生产者 end-->
完整的applicationContext-mq.xml文件代码:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
<!-- 扫描包 -->
<context:component-scan base-package="cn.niwotaxuexiba.activemq" />
<!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生产者 end-->
</beans>
创建一个QueueSender类,生产queue队列信息:
@Service public class QueueSender { // 注入jmsTemplate @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
生成topic话题信息:
@Service public class TopicSender { // 注入jmsTemplate @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String topicName, final String message) { jmsTemplate.send(topicName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
1098e
测试代码:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq.xml") public class ProducerTest { @Autowired private QueueSender queueSender; @Autowired private TopicSender topicSender; @Test public void testSendMessage() { queueSender.send("spring_queue", "你好,你我他"); topicSender.send("spring_topic", "你好,学习吧"); } }
编写消费者代码:
在applicationContext-mq-customer.xml中配置只扫描customer包:
<!-- 扫描包 --> <context:component-scan base-package="cn.niwotaxuexiba.activemq.consumer" />
配置Listener监听器,在applicationContext-mq-customer.xml中进行配置。
<!-- 消息消费者 start--> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 默认注册bean名称,应该是类名首字母小写 --> <jms:listener destination="spring_queue" ref="queueConsumer1"/> <jms:listener destination="spring_queue" ref="queueConsumer2"/> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="spring_topic" ref="topicConsumer1"/> <jms:listener destination="spring_topic" ref="topicConsumer2"/> </jms:listener-container> <!-- 消息消费者 end -->
完整的applicationContext-mq-customer.xml代码:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">
<!-- 扫描包 --> <context:component-scan base-package="cn.niwotaxuexiba.activemq.consumer" />
<!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息消费者 start--> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 默认注册bean名称,应该是类名首字母小写 --> <jms:listener destination="spring_queue" ref="queueConsumer1"/> <jms:listener destination="spring_queue" ref="queueConsumer2"/> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="spring_topic" ref="topicConsumer1"/> <jms:listener destination="spring_topic" ref="topicConsumer2"/> </jms:listener-container> <!-- 消息消费者 end -->
</beans>
编写消息消费者QueueCustomer1类,实现MessageListener接口:
@Service public class QueueConsumer1 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费者QueueConsumer1获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
创建另一个消息消费者QueueCustomer2类,实现MessageListener接口。
@Service public class QueueConsumer2 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费者QueueConsumer2获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
创建TopicCustomer1类,实现MessageListener接口。
@Service public class TopicConsumer1 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费者TopicConsumer1获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
创建TopicCustomer2类,实现MessageListener接口。
@Service public class TopicConsumer2 implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out .println("消费者TopicConsumer2获取消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
代码测试:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml") public class ConsumerTest { @Test public void testConsumerMessage() { while (true) { // junit退出,防止进程死掉 } } }
<–end–>
相关文章推荐
- 消息队列activemq整合spring发送端和接收端配置
- spring整合activemq消息队列之点对点模式
- Java消息队列-Spring整合ActiveMq
- Java消息队列-Spring整合ActiveMq
- Java消息队列-Spring整合ActiveMq
- ActiveMQ消息队列和spring进行整合实例
- Java消息队列-Spring整合ActiveMq
- Spring整合ActiveMQ实现简单的消息队列
- Spring整合ActiveMQ实现简单的消息队列
- Java消息队列-Spring整合ActiveMq
- Java消息队列-Spring整合ActiveMq
- Java消息队列-Spring整合ActiveMq
- 详解Java消息队列-Spring整合ActiveMq
- springboot整合activemq,应答模式,消息重发机制,消息持久化
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- lmax.disruptor高效内存消息队列spring整合
- 淘淘商城系列——Spring与ActiveMQ的整合及用JmsTemplate发送消息
- 消息队列 RabbitMQ 与 Spring 整合使用
- linux下 消息中间件ActiveMQ整合spring笔记二 接收消息
- Spring与ActiveMQ整合(多线程并发发送与接收消息)