您的位置:首页 > 产品设计 > UI/UE

ActiveMq-Queue开发

2015-08-24 10:16 453 查看
1.   生产者开发

1.1.  Spring配置

    在beans里xsi:schemaLocation引入:
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd
    <!-- ActiveMQ destinations -->
    <!-- QUEUE:点对点消息数据被持久化,每条消息都能被消费没有监听QUEUE地址也能被消费,数据不会丢失一对一的发布接受策略,保证数据完整
-->
<bean
id="QUEUE"class="org.apache.activemq.command.ActiveMQQueue">
    <property
name="physicalName">
           <value>JMS-QUEUE</value>
    </property>
</bean>
 
<!-- 生产消息配置 -->
<bean
id="queueProducer"class="com.tuan.common.activemq.impl.QueueProducer">
       <property
name="template"
ref="jmsTemplate"/>
       <property
name="destination"ref="QUEUE"
/>
</bean>
   
    <!-- ConnectionFactory -->
<bean
id="jmsConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory"> 

<property
name="brokerURL">
<value>failover:(tcp://ip1:61616,tcp://ip2:61616)</value> 

         </property>
         <property
name="alwaysSessionAsync">
                <value>true</value>
         </property> 

</bean>
 
    <!-- Spring JmsTemplate config-->
<bean
id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
       <property
name="connectionFactory"ref="jmsConnectionFactory"
/>
       <property
name="messageConverter"ref="defaultMessageConverter"
/>
       <property
name="sessionTransacted"
value="true"
/>
</bean>
 
    <!-- 自定义converter
,实现key-value的Map结构,value为序列化的POJO-->
    <!-- bean id="defaultMessageConverter"class="com.tuan.common.activemq.util.DefaultMessageConverter" /-->
   
    <!—Spring 标准converter,可以操作字符串-->
    <bean id="defaultMessageConverter"class=
4000
"org.springframework.jms.support.converter.SimpleMessageConverter"/>
 
 
1.2.  业务程序调用

传输Map:

QueueProducer queueProducer = (QueueProducer) wac.getBean("queueProducer");

         Map<String, Object> message = newLinkedHashMap<String, Object>();

    message.put(“key”,“value”);

    queueProducer.send(message);

   

    传输String:

    QueueProducerqueueProducer = (QueueProducer) wac.getBean("queueProducer");

    queueProducer.send("Hello World!");

 
2.  消费者开发

2.1.  Spring配置

<bean
id="QUEUE"
class="org.apache.activemq.command.ActiveMQQueue">
       <property
name="physicalName">
           <value>JMS-QUEUE</value>
       </property>
</bean>
   
    <!-- ConnectionFactory -->
<bean
id="jmsConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory"> 

<property
name="brokerURL">
<value>failover:(tcp://ip1:61616,tcp://ip2:61616)</value> 

         </property>
         <property
name="alwaysSessionAsync">
                <value>true</value>
         </property> 

    </bean>
   
    <!-- 添加事务 -->
<bean
id="jmsTransactionManager"      
class="org.springframework.jms.connection.JmsTransactionManager">
       <property
name="connectionFactory"ref="jmsConnectionFactory"
/>
</bean>
 
<!—指定消息接收的业务类 -->
<bean id="queueConsumer" class="com.xxx.QueueConsumer">
       <propertyname="destination" value="QUEUE" />
</bean>
   
<bean
id="queueListener"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
       <constructor-arg
ref="queueConsumer"/>
       <!--
指定消费消息的方法 -->
       <property
name="defaultListenerMethod"value="receive"
/>
       <property
name="messageConverter"ref="defaultMessageConverter"
/>
</bean>
<bean
id="queueListenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
       <property
name="connectionFactory"ref="jmsConnectionFactory"
/>
       <property
name="destination"
ref="QUEUE"
/>
       <property
name="messageListener"ref="queueListener"
/>
       <property
name="transactionManager"ref="jmsTransactionManager"
/>
       <property
name="sessionTransacted"
value="true"
/>
       <property
name="concurrentConsumers"value="5"
/><!-- 控制同时启几个concurrent listener threads -->

</bean>
   
<!-- 自定义converter
,实现key-value的Map结构,value为序列化的POJO-->
    <!-- beanid="defaultMessageConverter"class="com.tuan.common.activemq.util.DefaultMessageConverter" /-->
<!—Spring 标准converter,可以操作字符串-->
    <bean id="defaultMessageConverter"

class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

2.2. 程序调用

接收Map:
public
void
execute(Object message){
       Map<String, Object> mapMsg = (Map<String,Object>)message;
       Set<String> set = mapMsg.keySet();
       for (String key : set) {
           String str = "queue:key=" +key +
", value=" + mapMsg.get(key);
           System.out.println(str);
       }
}
接收字符串:

public
void
execute(Object message){
    String msg = (String)message;
}

 

 
3.  POM文件配置

<dependency>
           <groupId>org.apache.activemq</groupId>
           <artifactId>activemq-core</artifactId>
           <version>5.5.1</version>
       </dependency>
       <dependency>
           <groupId>org.apache.xbean</groupId>
           <artifactId>xbean-spring</artifactId>
           <version>3.8</version>
       </dependency>

       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring</artifactId>
           <version>2.5.5</version>
       </dependency>
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-core</artifactId>
           <version>2.5.5</version>
       </dependency>
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-beans</artifactId>
           <version>2.5.5</version>
       </dependency>
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context</artifactId>
           <version>2.5.5</version>
       </dependency>
        <dependency>
           <groupId>com.tuan.cozy</groupId>
           <artifactId>cozy-common-activemq</artifactId>
           <version>1.0.1.20120428</version>
       </dependency>
 

 
4.  特殊说明

4.1.TCP Transport:允许客户端通过TCP socket连接到远程的broker。以下是配置语法:

tcp://hostname:port?transportOptions

4.2.Failover Transport:是一种重新连接的机制,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:failover:(uri1,...,uriN)

4.3. 在log4j的配置文件中加入:

log4j.logger.org.apache.activemq.transport.failover=ERROR

关闭info时的连接信息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: