您的位置:首页 > 编程语言 > Java开发

ActiveMQ(06):ActiveMQ结合Spring开发

2017-04-12 12:42 316 查看
一、pom.xml与mq.properties
Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.7.RELEASE</version>
</dependency>
添加ActiveMQ的pool包,如下:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.11.1</version>
</dependency>
添加xbean的标签配置,如下:
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
pom.xml完整配置如下:
<properties>
<activemq.version>5.9.0</activemq.version>
<activemq-pool.version>5.11.1</activemq-pool.version>
<spring.version>4.1.7.RELEASE</spring.version>
<xbean.version>3.16</xbean.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
<commons-io.version>2.4</commons-io.version>
<commons-fileupload.version>1.3.1</commons-fileupload.version>
<fasterxml.jackson.version>2.8.4</fasterxml.jackson.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- Apache工具组件 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>${commons-fileupload.version}</version>
</dependency>
<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${codehaus.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${codehaus.jackson.version}</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq-pool.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>${xbean.version}</version>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
</dependencies>
二、mq.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> 
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.userName}" password="${activemq.password}" />

<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
<!-- 接收者ID,用于Topic订阅者的永久订阅-->
<property name="clientId" value="client-C" />
</bean>

<!-- =======Spring JmsTemplate 的消息生产者【开始】======== -->
<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
<!-- =======Spring JmsTemplate 的消息生产者【结束】======== -->

<!-- =======消息消费者=======【开始】 -->
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"/>
<jms:listener destination="test.queue" ref="queueReceiver2"/>
</jms:listener-container>

<!-- 定义Topic监听器 -->
<!-- 非持久化 -->
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
</jms:listener-container>
<!-- 持久化 -->
<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="client-C">
<jms:listener destination="test.topic2" subscription="topicReceiver2" ref="topicReceiver2"/>
</jms:listener-container>
<!-- =======消息消费者=======【结束】 -->
</beans>
三、java类

3.1 消费者监听器
3.1.1 队列消息监听器
package com.liuy.mq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
* 队列消息监听器1
* @description 队列消息监听器1
* @author liuy
* @version V1.00
* @date:2017年4月12日上午10:15:19
*/
@Component
public class QueueReceiver1 implements MessageListener {

@Override
public void onMessage(Message message) {
try {
System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.liuy.mq.consumer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
* 队列消息监听器2
* @description 队列消息监听器2
* @author liuy
* @version V1.00
* @date:2017年4月12日上午10:15:19
*/
@Component
public class QueueReceiver2 implements MessageListener {

@Override
public void onMessage(Message message) {
try {
System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3.1.2 Topic消息监听器
package com.liuy.mq.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
* Topic消息监听器1
* @description Topic消息监听器1
* @author liuy
* @version V1.00
* @date:2017年4月12日上午10:17:11
*/
@Component
public class TopicReceiver1 implements MessageListener{

@Override
public void onMessage(Message message) {
try {
System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}
package com.liuy.mq.consumer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.stereotype.Component;

/**
* Topic消息监听器2
* @description Topic消息监听器2
* @author liuy
* @version V1.00
* @date:2017年4月12日上午10:17:11
*/
@Component
public class TopicReceiver2 implements MessageListener{

@Override
public void onMessage(Message message) {
try {
System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}
3.2 消息生产者
package com.liuy.mq.producer.queue;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
* 队列消息生产者,发送消息到队列
* @description 队列消息生产者,发送消息到队列
* @author liuy
* @version V1.00
* @date:2017年4月12日上午10:20:46
*/
@Component("queueSender")
public class QueueSender {

@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean

/**
* 发送一条消息到指定的队列(目标)
* @param queueName 队列名称
* @param message 消息内容
*/
public void send(String queueName,final String message){
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}

}
package com.liuy.mq.producer.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

/**
* Topic生产者发送消息到Topic
* @description Topic生产者发送消息到Topic
* @author liuy
* @version V1.00
* @date:2017年4月12日上午10:20:46
*/
@Component("topicSender")
public class TopicSender {
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;

/**
* 发送一条消息到指定的队列(目标)
* @param queueName 队列名称
* @param message 消息内容
*/
public void send(String topicName,final String message){
jmsTemplate.send(topicName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}

}
四、测试
package com.liuy.test.common;

import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
* 测试共公类
* @description 测试共公类
* @author liuy
* @version V1.00
* @date:2016年4月24日下午5:20:54
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:application-context.xml")
public class SpringJunitTest
{

}
package com.liuy.test.core;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.liuy.mq.producer.queue.QueueSender;
import com.liuy.mq.producer.topic.TopicSender;
import com.liuy.test.common.SpringJunitTest;

/**
* @description 描述
* @author liuy
* @version 1.0
* @date:2017年4月11日下午9:00:18
*/
public class SpringQueueTest extends SpringJunitTest {
@Autowired
private QueueSender queueSender;
@Autowired
private TopicSender topicSender;

/**
* 发送消息到队列
* Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中
* @param message
* @return String
*/
@Test
public void testQueueSend() throws Exception {
queueSender.send("test.queue", "测试");
}

/**
* 发送消息到主题
* Topic主题 :放入一个消息,所有订阅者都会收到
* 这个是主题目的地是一对多的
* @param message
* @return String
*/
@Test
public void testTopicSend() throws Exception {
topicSender.send("test.topic", "测试222");
}
}


效果:
列队:



主题:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Spring 结合 ActiveMQ