您的位置:首页 > 编程语言 > Java开发

Spring整合ActiveMQ完成消息队列MQ编程

2017-12-21 21:13 489 查看
<–start–>

第一步:新建一个maven,将工程命名为activeMQ_spring。在pom.xml文件中导入相关jar包。

①spring开发和测试相关的jar包:

spring-context包

spring-test包

juint包

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>


②ActiveMQ的jar包

activemq-all包

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.0</version>
</dependency>


③spring整合activeMQ的jar包

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>


完整的pom.xml文件代码如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>cn.niwotaxuexiba.maven</groupId>
<artifactId>activeMQ_spring</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activeMQ_spring</name>
<dependencies>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.0</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.7.RELEASE</version> </dependency>
</dependencies>
</project>


第二步:编写配置生产者:

①配置activeMQ连接工厂。

<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />


②配置spring mq 管理工厂

<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>


③配置jmsTemplate模板

<!-- Spring JmsTemplate 的消息生产者 start-->

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>

<!--Spring JmsTemplate 的消息生产者 end-->


完整的applicationContext-mq.xml文件代码:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">

<!-- 扫描包 -->
<context:component-scan base-package="cn.niwotaxuexiba.activemq" />

<!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />

<!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean>

<!-- Spring JmsTemplate 的消息生产者 start-->

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生产者 end-->

</beans>


创建一个QueueSender类,生产queue队列信息:

@Service
public class QueueSender {
// 注入jmsTemplate
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;

public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}


生成topic话题信息:

@Service
public class TopicSender {
// 注入jmsTemplate
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;

public void send(String topicName, final String message) {
jmsTemplate.send(topicName, new MessageCreator() {

public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}

1098e

测试代码:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq.xml")
public class ProducerTest {
@Autowired
private QueueSender queueSender;

@Autowired
private TopicSender topicSender;

@Test
public void testSendMessage() {
queueSender.send("spring_queue", "你好,你我他");
topicSender.send("spring_topic", "你好,学习吧");
}
}


编写消费者代码:

在applicationContext-mq-customer.xml中配置只扫描customer包:

<!-- 扫描包 -->
<context:component-scan base-package="cn.niwotaxuexiba.activemq.consumer" />


配置Listener监听器,在applicationContext-mq-customer.xml中进行配置。

<!-- 消息消费者 start-->

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<!-- 默认注册bean名称,应该是类名首字母小写  -->
<jms:listener destination="spring_queue" ref="queueConsumer1"/>
<jms:listener destination="spring_queue" ref="queueConsumer2"/>
</jms:listener-container>

<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="spring_topic" ref="topicConsumer1"/>
<jms:listener destination="spring_topic" ref="topicConsumer2"/>
</jms:listener-container>

<!-- 消息消费者 end -->


完整的applicationContext-mq-customer.xml代码:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd ">

<!-- 扫描包 --> <context:component-scan base-package="cn.niwotaxuexiba.activemq.consumer" />

<!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://localhost:61616" userName="admin" password="admin" />

<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>

<!-- 消息消费者 start--> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <!-- 默认注册bean名称,应该是类名首字母小写 --> <jms:listener destination="spring_queue" ref="queueConsumer1"/> <jms:listener destination="spring_queue" ref="queueConsumer2"/> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="spring_topic" ref="topicConsumer1"/> <jms:listener destination="spring_topic" ref="topicConsumer2"/> </jms:listener-container> <!-- 消息消费者 end -->

</beans>


编写消息消费者QueueCustomer1类,实现MessageListener接口:

@Service
public class QueueConsumer1 implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者QueueConsumer1获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


创建另一个消息消费者QueueCustomer2类,实现MessageListener接口。

@Service
public class QueueConsumer2 implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者QueueConsumer2获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


创建TopicCustomer1类,实现MessageListener接口。

@Service
public class TopicConsumer1 implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者TopicConsumer1获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


创建TopicCustomer2类,实现MessageListener接口。

@Service
public class TopicConsumer2 implements MessageListener {

public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out
.println("消费者TopicConsumer2获取消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


代码测试:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml")
public class ConsumerTest {

@Test
public void testConsumerMessage() {
while (true) {
// junit退出,防止进程死掉
}
}
}


<–end–>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息