您的位置:首页 > 编程语言 > Java开发

JMS学习十(Spring+ActiveMQ同步、异步)

2017-03-13 23:32 477 查看
通过前面的九篇我们了解了JMS(ActiveMQ),接下来这篇文章我们来看看spring中使用ActiveMQ,但通过资料,demo的查找以及整合实例的使用个人感觉spring和activemq整合使用确实不是很好,很方便!

spring中使用activemq我们从如下几个步骤来学习:

1、spring+activemq的同步消息接收(queue+topic)

2、spring+activemq的异步消息接收(queue+topic)

3、spring+activemq消息的持久化订阅(topic)

4、spring+activemq消息的持久化(queue+topic)

5、spring+activemq集群配置(queue+topic)

这篇文章中先学习1、2(为了简单期间,只使用了spring没有springmvc部分,项目是maven格式)

一、spring+activemq的同步消息接收

1、项目结构



项目使用maven的,还有控制层是没用的,直接现在service层main启动,还有其中的ApplicationContext1、2、3同步接收、异步接收和持久化的配置。

2、maven中添加的依赖包即pom.xml中添加依赖包:

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.6</version>
</dependency>
<!--ActiveMQ所需要的jar包 -->
<!-- 添加ActiveMQ的pool包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!--依赖包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>


3、ActiveMQ在spring中的配置即ApplicationContext.xml中配置Activemq

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> 
<!--ActiveMQ相关配置 -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--消息服务连接信息 -->
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
<property name="userName">
<value>admin</value>
</property>
<property name="password">
<value>admin</value>
</property>
</bean>
</property>
<!--最大连接数,因为上面是使用了pool -->
<property name="maxConnections" value="100"></property>
</bean>
<!-- queue目的地配置 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>

<!-- spring 使用jmsTemplate来实现消息的发送和接受 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"></property>
<!--目的地的设置,使用spring来使用activemq时,使用queue还是topic很方便,在这里引用不同地址就ok了 -->
<property name="defaultDestination" ref="destination"></property>
<!--转换器,我们自己可以继承重写这个类的方法 ,这里使用spring提供的默认方法 -->
<property name="messageConverter">
<bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
</beans>


4、消息生产者:

package springs.activemq.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class QueueProducer {
// 负责消息的发送和接收可以理解为MessageProducer 和MessageConsummer的组合。
private static JmsTemplate jt = null;

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext(
"config/ApplicationContext.xml");
// 获取JmsTemplate对象
jt = (JmsTemplate) ctx.getBean("jmsTemplate");
// 调用方法,发送消息
jt.send(new MessageCreator() {
// 消息的产生,返回消息发送消息
public Message createMessage(Session s) throws JMSException {
TextMessage msg = s
.createTextMessage("Spring send msg ----> Hello activeMQ5");
return msg;
}
});
System.out.println("end!");
}
}


5、消息消费者:

package springs.activemq.Service;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class QueueConsumer {
private static JmsTemplate jt = null;

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext(
"config/ApplicationContext.xml");
//获取JmsTemplate对象
jt = (JmsTemplate) ctx.getBean("jmsTemplate");
//接收消息
String msg = (String) jt.receiveAndConvert();
System.out.println("receive msg = " + msg);
}
}


6、执行结果:



ok这样一个简单的spring+actvemq整合demo就完成了!

总结流程:

1、在pom.xml中添加依赖的jar包

2、将activemq相关的配置配置到配置文件中,为了更简单明了的说明白,这里只使用了spring,所以你也看到了activemq相关的配置,以后不管用到什么项目中则直接把这些拷过去即ok了!

3、消息生产者即定义消息,发送消息!

4、消息消费者,接收消息,处理消息、回复确认!

就这样ok了……

上面的这个列子是我们使用了queue那我们要使用topic怎么办,看上面的配置,主需要将jmsTemplate中目的地的引用改为定义的topic的引用就ok了配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> 
<!--ActiveMQ相关配置 -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--消息服务连接信息 -->
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
<property name="userName">
<value>admin</value>
</property>
<property name="password">
<value>admin</value>
</property>
</bean>
</property>
<!--最大连接数,因为上面是使用了pool -->
<property name="maxConnections" value="100"></property>
</bean>
<!-- queue目的地配置 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>

<!-- spring 使用jmsTemplate来实现消息的发送和接受 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"></property>
<!--目的地的设置,使用spring来使用activemq时,使用queue还是topic很方便,在这里引用不同地址就ok了 -->
<property name="defaultDestination" ref="destinationTopic"></property>
<!--转换器,我们自己可以继承重写这个类的方法 ,这里使用spring提供的默认方法 -->
<property name="messageConverter">
<bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
</beans>
发现没其实只修改了jmsTemplate中的引用其他的什么都不用改。

这样我们spring+activemq的queue 和topic的同步接收都好了,下面来看看异步接收:

二、spring+activemq的异步消息接收

上面的那种整合方式和直接使用ActiveMQ编写的时候使用receive()方法接收消息一样,是同步接收,但是我们应该更喜欢异步的,下面来看看其实也很简单!

需要的jar包都一样的没有什么不同,项目结构也是一样的!

1、spring+activemq配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> 
<!--ActiveMQ相关配置 -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--消息服务连接信息 -->
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
<property name="userName">
<value>admin</value>
</property>
<property name="password">
<value>admin</value>
</property>
</bean>
</property>
<!--最大连接数,因为上面是使用了pool -->
<property name="maxConnections" value="100"></property>
</bean>
<!-- queue目的地配置 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>

<!-- spring 使用jmsTemplate来实现消息的发送和接受 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"></property>
<!--目的地的设置,使用spring来使用activemq时,使用queue还是topic很方便,在这里引用不同地址就ok了 -->
<property name="defaultDestination" ref="destination"></property>
<!--转换器,我们自己可以继承重写这个类的方法 ,这里使用spring提供的默认方法 -->
<property name="messageConverter">
<bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
<!--异步监听 -->
<bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener">
</bean>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>


发现没,这个配置文件和上面的不一样,就是添加了异步监听即一个监听类和一个jmsContainer类!

2、消息生产者:

package springs.activemq.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class QueueProducer {
// 负责消息的发送和接收可以理解为MessageProducer 和MessageConsummer的组合。
private static JmsTemplate jt = null;

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext(
"config/ApplicationContext.xml");
// 获取JmsTemplate对象
jt = (JmsTemplate) ctx.getBean("jmsTemplate");
// 调用方法,发送消息
jt.send(new MessageCreator() {
// 消息的产生,返回消息发送消息
public Message createMessage(Session s) throws JMSException {
TextMessage msg = s
.createTextMessage("Spring send msg ----> Hello activeMQ4");
return msg;
}
});
System.out.println("end!");
}
}


3、消息消费者:

这里的消息消费者和之前同步方式不一样,这里的消息是在MessageListener类onMessage方法中处理,那消息是怎么监听的那,这个我们已经在配置文件中配置了。

package springs.activemq.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
try {
String message = ((TextMessage) arg0).getText();
System.out.println("textmessage:" + message);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}


发现没,这个和我们之前的例子的异步监听的监听类是一样的,在这里spring已经给我们留了入口,我们只需要写这个监听类,并把它给消息接收类,当有消息的时候我们就能得知,不用向之前的一样自己写消息接收类了!

上面的配置是对queue队列的,其实topic的也很简单,就是修改目的地就ok了,但是要注意这里要修改两处:

Topic的异步监听配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> 
<!--ActiveMQ相关配置 -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--消息服务连接信息 -->
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
<property name="userName">
<value>admin</value>
</property>
<property name="password">
<value>admin</value>
</property>
</bean>
</property>
<!--最大连接数,因为上面是使用了pool -->
<property name="maxConnections" value="100"></property>
</bean>
<!-- queue目的地配置 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>

<!-- spring 使用jmsTemplate来实现消息的发送和接受 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"></property>
<!--目的地的设置,使用spring来使用activemq时,使用queue还是topic很方便,在这里引用不同地址就ok了 -->
<property name="defaultDestination" ref="destinationTopic"></property>
<!--转换器,我们自己可以继承重写这个类的方法 ,这里使用spring提供的默认方法 -->
<property name="messageConverter">
<bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
<!--异步监听 -->
<bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener">
</bean>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destinationTopic" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>


消息生产者和消息消费者的配置都是一样的没什么变化!

除了上面的配置,下面还有一种配置方式,这种配置方式相比上面的分开了一步步的配置,比较清晰,配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd"> 
<!--第三方工厂,也是具体实现连接的 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
<property name="userName" value="admin"></property>
<property name="password" value="admin"></property>
</bean>
<!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="100" />
</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>

<!-- queue目的地配置 -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-queue" />
</bean>
<!-- topic目的地配置,其实不管是topic还是queue则他们的底层实现不同但是通过封装api就差不多了,而在spring中更是简单 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic" />
</bean>

<!-- spring 使用jmsTemplate来实现消息的发送和接受 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="defaultDestination" ref="destination"></property>
<property name="messageConverter">
<bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
<!--异步监听 -->
<bean id="myMessageListener" class="springs.activemq.Service.MyMessageListener">
</bean>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>


ok 这样queue的同步,异步接收都ok了,下一篇我们看看topic的持久化订阅!

项目源码下载:点击打开链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: