ActiveMQ消息队列和spring进行整合实例
2014-01-02 10:22
776 查看
1所需的jar包
2
主配置文件
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" >
<context:annotation-config/>
<context:component-scan base-package="com.merit"/>
<import resource="spring-mq.xml"/>
</beans>
引用的spring-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" >
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="subject" />
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
</bean>
<!-- 发送消息的目的地(一个队列) -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="subject" />
</bean>
<!-- 消息监听 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destinationName" value="subject" />
<!-- <property name="concurrentConsumers" value="10" />
<property name="messageListener" ref="messageReceiver" />
<property name="pubSubNoLocal" value="false"></property> -->
</bean>
<bean id="messageReceiver"
class="com.merit.mq.test.ProxyJMSConsumer">
<property name="jmsTemplate" ref="jmsTemplate"></property>
</bean>
</beans>
3 下面写生产者和消费者,使用的时候要先启动消费者,消费者就是接收者,生产者就是发送者,这个程序写的不灵活,所以要先执行消费者。
生产者代码:
package com.merit.mq.test;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Message;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class HelloSender {
/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });
JmsTemplate template = (JmsTemplate) applicationContext
.getBean("jmsTemplate");
Destination destination = (Destination) applicationContext
.getBean("destination");
template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
System.out.println("test");
return session
.createTextMessage("发送消息:Hello ActiveMQ Text Message!");
}
});
System.out.println("成功发送了一条JMS消息");
}
}
消费者代码: (这个main方法要先启动)
package com.merit.mq.test;
import javax.jms.Destination;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
public class ProxyJMSConsumer {
public ProxyJMSConsumer() {
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });
ProxyJMSConsumer proxyJMSConsumer = (ProxyJMSConsumer) applicationContext
.getBean("messageReceiver");
System.out.println("初始化消息消费者");
proxyJMSConsumer.recive();
}
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
/**
* 监听到消息目的有消息后自动调用onMessage(Message message)方法
*/
public void recive() {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });
Destination destination = (Destination) applicationContext
.getBean("destination");
while (true) {
try {
TextMessage txtmsg = (TextMessage) jmsTemplate
.receive(destination);
if (null != txtmsg) {
System.out.println("[DB Proxy] " + txtmsg);
System.out.println("[DB Proxy] 收到消息内容为: "
+ txtmsg.getText());
} else
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
4 按照顺序启动完成后:
最终结果展示
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/workspace/j2ee/sim-mq/WebContent/WEB-INF/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/workspace/j2ee/sim-mq/WebContent/WEB-INF/lib/activemq-all-5.9.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
初始化消息消费者
[DB Proxy] ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:wangguilong-5451-1388629375011-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:wangguilong-5451-1388629375011-1:2:1:1, destination = topic://subject, transactionId = null, expiration = 0, timestamp = 1388629375199, arrival = 0, brokerInTime = 1388629375199, brokerOutTime = 1388629375199, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@761d63, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 发送消息:Hello ActiveMQ Text Message!}
[DB Proxy] 收到消息内容为: 发送消息:Hello ActiveMQ Text Message!
2
主配置文件
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" >
<context:annotation-config/>
<context:component-scan base-package="com.merit"/>
<import resource="spring-mq.xml"/>
</beans>
引用的spring-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" >
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="subject" />
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
</bean>
<!-- 发送消息的目的地(一个队列) -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="subject" />
</bean>
<!-- 消息监听 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destinationName" value="subject" />
<!-- <property name="concurrentConsumers" value="10" />
<property name="messageListener" ref="messageReceiver" />
<property name="pubSubNoLocal" value="false"></property> -->
</bean>
<bean id="messageReceiver"
class="com.merit.mq.test.ProxyJMSConsumer">
<property name="jmsTemplate" ref="jmsTemplate"></property>
</bean>
</beans>
3 下面写生产者和消费者,使用的时候要先启动消费者,消费者就是接收者,生产者就是发送者,这个程序写的不灵活,所以要先执行消费者。
生产者代码:
package com.merit.mq.test;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Message;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class HelloSender {
/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });
JmsTemplate template = (JmsTemplate) applicationContext
.getBean("jmsTemplate");
Destination destination = (Destination) applicationContext
.getBean("destination");
template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
System.out.println("test");
return session
.createTextMessage("发送消息:Hello ActiveMQ Text Message!");
}
});
System.out.println("成功发送了一条JMS消息");
}
}
消费者代码: (这个main方法要先启动)
package com.merit.mq.test;
import javax.jms.Destination;
import javax.jms.TextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
public class ProxyJMSConsumer {
public ProxyJMSConsumer() {
}
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });
ProxyJMSConsumer proxyJMSConsumer = (ProxyJMSConsumer) applicationContext
.getBean("messageReceiver");
System.out.println("初始化消息消费者");
proxyJMSConsumer.recive();
}
private JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
/**
* 监听到消息目的有消息后自动调用onMessage(Message message)方法
*/
public void recive() {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });
Destination destination = (Destination) applicationContext
.getBean("destination");
while (true) {
try {
TextMessage txtmsg = (TextMessage) jmsTemplate
.receive(destination);
if (null != txtmsg) {
System.out.println("[DB Proxy] " + txtmsg);
System.out.println("[DB Proxy] 收到消息内容为: "
+ txtmsg.getText());
} else
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
4 按照顺序启动完成后:
最终结果展示
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/workspace/j2ee/sim-mq/WebContent/WEB-INF/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/workspace/j2ee/sim-mq/WebContent/WEB-INF/lib/activemq-all-5.9.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
初始化消息消费者
[DB Proxy] ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:wangguilong-5451-1388629375011-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:wangguilong-5451-1388629375011-1:2:1:1, destination = topic://subject, transactionId = null, expiration = 0, timestamp = 1388629375199, arrival = 0, brokerInTime = 1388629375199, brokerOutTime = 1388629375199, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@761d63, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 发送消息:Hello ActiveMQ Text Message!}
[DB Proxy] 收到消息内容为: 发送消息:Hello ActiveMQ Text Message!
相关文章推荐
- Java消息队列-Spring整合ActiveMq
- spring整合apache activemq实现消息发送的三种方式代码配置实例
- spring整合activemq发送MQ消息[Topic模式]实例
- 【消息队列】二、Spring整合JMS(消息中间件)实例
- spring整合activemq发送消息[queue类型]实例
- 消息队列 RabbitMQ 与 Spring 整合使用的实例代码
- Spring整合ActiveMQ实现简单的消息队列
- Java消息队列-Spring整合ActiveMq
- MQ消息队列--RabbitMQ整合Spring理论及实例讲解
- spring整合activemq发送MQ消息[queue模式]实例
- Spring整合RabbitMQ进行消息队列开发
- 详解Java消息队列-Spring整合ActiveMq
- Java消息队列-Spring整合ActiveMq
- spring整合activemq消息队列之点对点模式
- Spring整合ActiveMQ完成消息队列MQ编程
- 利用Spring与ActiveMQ整合发送、接收消息实例(Queue与Topic模式)
- Java消息队列-Spring整合ActiveMq
- Java消息队列-Spring整合ActiveMq
- spring整合activemq发送MQ消息[Topic模式]实例,activemqmq
- 消息队列activemq整合spring发送端和接收端配置