ActiveMq-Queue开发
2015-08-24 10:16
453 查看
1. 生产者开发
1.1. Spring配置
在beans里xsi:schemaLocation引入:
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd
<!-- ActiveMQ destinations -->
<!-- QUEUE:点对点消息数据被持久化,每条消息都能被消费没有监听QUEUE地址也能被消费,数据不会丢失一对一的发布接受策略,保证数据完整
-->
<bean
id="QUEUE"class="org.apache.activemq.command.ActiveMQQueue">
<property
name="physicalName">
<value>JMS-QUEUE</value>
</property>
</bean>
<!-- 生产消息配置 -->
<bean
id="queueProducer"class="com.tuan.common.activemq.impl.QueueProducer">
<property
name="template"
ref="jmsTemplate"/>
<property
name="destination"ref="QUEUE"
/>
</bean>
<!-- ConnectionFactory -->
<bean
id="jmsConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<property
name="brokerURL">
<value>failover:(tcp://ip1:61616,tcp://ip2:61616)</value>
</property>
<property
name="alwaysSessionAsync">
<value>true</value>
</property>
</bean>
<!-- Spring JmsTemplate config-->
<bean
id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<property
name="connectionFactory"ref="jmsConnectionFactory"
/>
<property
name="messageConverter"ref="defaultMessageConverter"
/>
<property
name="sessionTransacted"
value="true"
/>
</bean>
<!-- 自定义converter
,实现key-value的Map结构,value为序列化的POJO-->
<!-- bean id="defaultMessageConverter"class="com.tuan.common.activemq.util.DefaultMessageConverter" /-->
<!—Spring 标准converter,可以操作字符串-->
<bean id="defaultMessageConverter"class=
4000
"org.springframework.jms.support.converter.SimpleMessageConverter"/>
1.2. 业务程序调用
传输Map:
QueueProducer queueProducer = (QueueProducer) wac.getBean("queueProducer");
Map<String, Object> message = newLinkedHashMap<String, Object>();
message.put(“key”,“value”);
queueProducer.send(message);
传输String:
QueueProducerqueueProducer = (QueueProducer) wac.getBean("queueProducer");
queueProducer.send("Hello World!");
2. 消费者开发
2.1. Spring配置
<bean
id="QUEUE"
class="org.apache.activemq.command.ActiveMQQueue">
<property
name="physicalName">
<value>JMS-QUEUE</value>
</property>
</bean>
<!-- ConnectionFactory -->
<bean
id="jmsConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<property
name="brokerURL">
<value>failover:(tcp://ip1:61616,tcp://ip2:61616)</value>
</property>
<property
name="alwaysSessionAsync">
<value>true</value>
</property>
</bean>
<!-- 添加事务 -->
<bean
id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property
name="connectionFactory"ref="jmsConnectionFactory"
/>
</bean>
<!—指定消息接收的业务类 -->
<bean id="queueConsumer" class="com.xxx.QueueConsumer">
<propertyname="destination" value="QUEUE" />
</bean>
<bean
id="queueListener"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg
ref="queueConsumer"/>
<!--
指定消费消息的方法 -->
<property
name="defaultListenerMethod"value="receive"
/>
<property
name="messageConverter"ref="defaultMessageConverter"
/>
</bean>
<bean
id="queueListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property
name="connectionFactory"ref="jmsConnectionFactory"
/>
<property
name="destination"
ref="QUEUE"
/>
<property
name="messageListener"ref="queueListener"
/>
<property
name="transactionManager"ref="jmsTransactionManager"
/>
<property
name="sessionTransacted"
value="true"
/>
<property
name="concurrentConsumers"value="5"
/><!-- 控制同时启几个concurrent listener threads -->
</bean>
<!-- 自定义converter
,实现key-value的Map结构,value为序列化的POJO-->
<!-- beanid="defaultMessageConverter"class="com.tuan.common.activemq.util.DefaultMessageConverter" /-->
<!—Spring 标准converter,可以操作字符串-->
<bean id="defaultMessageConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
2.2. 程序调用
接收Map:
public
void execute(Object message){
Map<String, Object> mapMsg = (Map<String,Object>)message;
Set<String> set = mapMsg.keySet();
for (String key : set) {
String str = "queue:key=" +key +
", value=" + mapMsg.get(key);
System.out.println(str);
}
}
接收字符串:
public
void execute(Object message){
String msg = (String)message;
}
3. POM文件配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.8</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>com.tuan.cozy</groupId>
<artifactId>cozy-common-activemq</artifactId>
<version>1.0.1.20120428</version>
</dependency>
4. 特殊说明
4.1.TCP Transport:允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
tcp://hostname:port?transportOptions
4.2.Failover Transport:是一种重新连接的机制,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:failover:(uri1,...,uriN)
4.3. 在log4j的配置文件中加入:
log4j.logger.org.apache.activemq.transport.failover=ERROR
关闭info时的连接信息
1.1. Spring配置
在beans里xsi:schemaLocation引入:
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd
<!-- ActiveMQ destinations -->
<!-- QUEUE:点对点消息数据被持久化,每条消息都能被消费没有监听QUEUE地址也能被消费,数据不会丢失一对一的发布接受策略,保证数据完整
-->
<bean
id="QUEUE"class="org.apache.activemq.command.ActiveMQQueue">
<property
name="physicalName">
<value>JMS-QUEUE</value>
</property>
</bean>
<!-- 生产消息配置 -->
<bean
id="queueProducer"class="com.tuan.common.activemq.impl.QueueProducer">
<property
name="template"
ref="jmsTemplate"/>
<property
name="destination"ref="QUEUE"
/>
</bean>
<!-- ConnectionFactory -->
<bean
id="jmsConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<property
name="brokerURL">
<value>failover:(tcp://ip1:61616,tcp://ip2:61616)</value>
</property>
<property
name="alwaysSessionAsync">
<value>true</value>
</property>
</bean>
<!-- Spring JmsTemplate config-->
<bean
id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<property
name="connectionFactory"ref="jmsConnectionFactory"
/>
<property
name="messageConverter"ref="defaultMessageConverter"
/>
<property
name="sessionTransacted"
value="true"
/>
</bean>
<!-- 自定义converter
,实现key-value的Map结构,value为序列化的POJO-->
<!-- bean id="defaultMessageConverter"class="com.tuan.common.activemq.util.DefaultMessageConverter" /-->
<!—Spring 标准converter,可以操作字符串-->
<bean id="defaultMessageConverter"class=
4000
"org.springframework.jms.support.converter.SimpleMessageConverter"/>
1.2. 业务程序调用
传输Map:
QueueProducer queueProducer = (QueueProducer) wac.getBean("queueProducer");
Map<String, Object> message = newLinkedHashMap<String, Object>();
message.put(“key”,“value”);
queueProducer.send(message);
传输String:
QueueProducerqueueProducer = (QueueProducer) wac.getBean("queueProducer");
queueProducer.send("Hello World!");
2. 消费者开发
2.1. Spring配置
<bean
id="QUEUE"
class="org.apache.activemq.command.ActiveMQQueue">
<property
name="physicalName">
<value>JMS-QUEUE</value>
</property>
</bean>
<!-- ConnectionFactory -->
<bean
id="jmsConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<property
name="brokerURL">
<value>failover:(tcp://ip1:61616,tcp://ip2:61616)</value>
</property>
<property
name="alwaysSessionAsync">
<value>true</value>
</property>
</bean>
<!-- 添加事务 -->
<bean
id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property
name="connectionFactory"ref="jmsConnectionFactory"
/>
</bean>
<!—指定消息接收的业务类 -->
<bean id="queueConsumer" class="com.xxx.QueueConsumer">
<propertyname="destination" value="QUEUE" />
</bean>
<bean
id="queueListener"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg
ref="queueConsumer"/>
<!--
指定消费消息的方法 -->
<property
name="defaultListenerMethod"value="receive"
/>
<property
name="messageConverter"ref="defaultMessageConverter"
/>
</bean>
<bean
id="queueListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property
name="connectionFactory"ref="jmsConnectionFactory"
/>
<property
name="destination"
ref="QUEUE"
/>
<property
name="messageListener"ref="queueListener"
/>
<property
name="transactionManager"ref="jmsTransactionManager"
/>
<property
name="sessionTransacted"
value="true"
/>
<property
name="concurrentConsumers"value="5"
/><!-- 控制同时启几个concurrent listener threads -->
</bean>
<!-- 自定义converter
,实现key-value的Map结构,value为序列化的POJO-->
<!-- beanid="defaultMessageConverter"class="com.tuan.common.activemq.util.DefaultMessageConverter" /-->
<!—Spring 标准converter,可以操作字符串-->
<bean id="defaultMessageConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
2.2. 程序调用
接收Map:
public
void execute(Object message){
Map<String, Object> mapMsg = (Map<String,Object>)message;
Set<String> set = mapMsg.keySet();
for (String key : set) {
String str = "queue:key=" +key +
", value=" + mapMsg.get(key);
System.out.println(str);
}
}
接收字符串:
public
void execute(Object message){
String msg = (String)message;
}
3. POM文件配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.8</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>com.tuan.cozy</groupId>
<artifactId>cozy-common-activemq</artifactId>
<version>1.0.1.20120428</version>
</dependency>
4. 特殊说明
4.1.TCP Transport:允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
tcp://hostname:port?transportOptions
4.2.Failover Transport:是一种重新连接的机制,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:failover:(uri1,...,uriN)
4.3. 在log4j的配置文件中加入:
log4j.logger.org.apache.activemq.transport.failover=ERROR
关闭info时的连接信息
相关文章推荐
- UINavigatonController遮挡内容
- tableview 取消选中状态
- 【POJ3159】【差分系统】【dij+priority_queue】
- android-如何关闭AlertDialog.Builder对话框
- poj2031-Building a Space Station(最小生成树,kruskal,prime)
- TLD(Tracking-Learning-Detection)学习与源码理解之(build)
- Building a Space Station 1718 (三维的 最小生成树)
- Building a Space Station 2031 (三维的 最小生成树 prim)
- 第2组UI-ImageView及子类
- Web开发工具之HBuilder初探
- 使用requirejs实现模块化编程
- [LeetCode] Missing Number (A New Questions Added Today)
- UIScrollView_UIPageControl
- Android开发之Volley定制自己的Request
- css3的媒体查询(Media Queries)
- Java中SerialVersionUID的作用
- UIScrollView
- Windows GUI 开发历史
- Windows GUI 开发历史
- POJ2778 DNA Sequence AC自动机+快速幂+DP