您的位置:首页 > 编程语言 > Java开发

ActiveMQ消息队列和spring进行整合实例

2014-01-02 10:22 776 查看
1所需的jar包



2

主配置文件

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" >
<context:annotation-config/>
<context:component-scan base-package="com.merit"/>

<import resource="spring-mq.xml"/>
</beans>


引用的spring-mq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" >
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="subject" />
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="true" />
</bean>

<!-- 发送消息的目的地(一个队列) -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="subject" />
</bean>

<!-- 消息监听 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory" />
<property name="destinationName" value="subject" />
<!-- <property name="concurrentConsumers" value="10" />
<property name="messageListener" ref="messageReceiver" />
<property name="pubSubNoLocal" value="false"></property> -->
</bean>

<bean id="messageReceiver"
class="com.merit.mq.test.ProxyJMSConsumer">
<property name="jmsTemplate" ref="jmsTemplate"></property>
</bean>
</beans>


3 下面写生产者和消费者,使用的时候要先启动消费者,消费者就是接收者,生产者就是发送者,这个程序写的不灵活,所以要先执行消费者。
生产者代码:

package com.merit.mq.test;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;

import javax.jms.Message;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class HelloSender {

/**
* @param args
*/
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });

JmsTemplate template = (JmsTemplate) applicationContext
.getBean("jmsTemplate");

Destination destination = (Destination) applicationContext
.getBean("destination");

template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
System.out.println("test");
return session
.createTextMessage("发送消息:Hello ActiveMQ Text Message!");
}

});
System.out.println("成功发送了一条JMS消息");

}

}


消费者代码: (这个main方法要先启动)
package com.merit.mq.test;

import javax.jms.Destination;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class ProxyJMSConsumer {

public ProxyJMSConsumer() {

}

public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });

ProxyJMSConsumer proxyJMSConsumer = (ProxyJMSConsumer) applicationContext
.getBean("messageReceiver");

System.out.println("初始化消息消费者");
proxyJMSConsumer.recive();
}
private JmsTemplate jmsTemplate;

public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

/**
* 监听到消息目的有消息后自动调用onMessage(Message message)方法
*/
public void recive() {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "com/merit/resource/applicationContext.xml" });

Destination destination = (Destination) applicationContext
.getBean("destination");

while (true) {

try {
TextMessage txtmsg = (TextMessage) jmsTemplate
.receive(destination);
if (null != txtmsg) {
System.out.println("[DB Proxy] " + txtmsg);
System.out.println("[DB Proxy] 收到消息内容为: "
+ txtmsg.getText());
} else
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}

}


4 按照顺序启动完成后:
最终结果展示

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/workspace/j2ee/sim-mq/WebContent/WEB-INF/lib/slf4j-log4j12-1.6.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/workspace/j2ee/sim-mq/WebContent/WEB-INF/lib/activemq-all-5.9.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
初始化消息消费者
[DB Proxy] ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:wangguilong-5451-1388629375011-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:wangguilong-5451-1388629375011-1:2:1:1, destination = topic://subject, transactionId = null, expiration = 0, timestamp = 1388629375199, arrival = 0, brokerInTime = 1388629375199, brokerOutTime = 1388629375199, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@761d63, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 发送消息:Hello ActiveMQ Text Message!}
[DB Proxy] 收到消息内容为: 发送消息:Hello ActiveMQ Text Message!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: