您的位置:首页 > 编程语言 > Java开发

activemq+spring 持久化发送消息

2012-12-16 11:56 603 查看
最近有项目,需要用到 jms来提高性能。在网上查了下 ,发现 activeMQ 不错,所以就在晚上搜了些资料。

在网上找的资料如下(相当给力):


1、先是下载activeMq。

2、由于默认的activemq是日志文件的持久订阅,需要修改activemq的配置文件才能持久化到数据库里,在activemq的conf目录下的activemq.xml,找到

Xml代码


<persistenceAdapter>

<kahaDB directory="${activemq.base}/data/kahadb"/>

</persistenceAdapter>

把它注释掉换成

Xml代码


<persistenceAdapter>

<jdbcPersistenceAdapter dataSource="#oracle-ds"/>

</persistenceAdapter>

还要在activemq.xml的<bean></beans>里面增加数据库的配置,并且把oracle的驱动jar包,复制到activemq下的lib目录里,由于我使用的是oracle我的配置如下:

Xml代码


<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>

<property name="url" value="jdbc:oracle:thin:@10.60.30.31:1521:orcl"/>

<property name="username" value="activemq"/>

<property name="password" value="activemq"/>

<property name="maxActive" value="200"/>

<property name="poolPreparedStatements" value="true"/>

;/bean>

到此你的activemq发送消息就可以持久化了(注:前提是你在spring里配置activemq时要配置消息的持久化)

3、接下来是spring+activemq 使用的jar包,都在activemq目录下的lib里



4、下来是具体一些配置文件和类文件

发送消息类

Java代码


/**

* message sender

* @description <p></p>

* @author quzishen

* @project NormandyPositionII

* @class MessageSender.java

* @version 1.0

* @time 2011-1-11

*/

public class MessageSender {

// ~~~ jmsTemplate

public JmsTemplate jmsTemplate;

/**

* send message

*/

public void sendMessage(){

jmsTemplate.convertAndSend("hello jms!");

}

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

}

接收消息类

Java代码


/**

* message receiver

* @description <p></p>

* @author quzishen

* @project NormandyPositionII

* @class MessageReceiver.java

* @version 1.0

* @time 2011-1-11

*/

public class MessageReceiver implements MessageListener {

/* (non-Javadoc)

* @see javax.jms.MessageListener#onMessage(javax.jms.Message)

*/

public void onMessage(Message m) {

System.out.println("[receive message]");

ObjectMessage om = (ObjectMessage) m;

try {

String key1 = om.getStringProperty("key1");

System.out.println(key1);

System.out.println("model:"+om.getJMSDeliveryMode());

System.out.println("destination:"+om.getJMSDestination());

System.out.println("type:"+om.getJMSType());

System.out.println("messageId:"+om.getJMSMessageID());

System.out.println("time:"+om.getJMSTimestamp());

System.out.println("expiredTime:"+om.getJMSExpiration());

System.out.println("priority:"+om.getJMSPriority());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

在发送消息和接收消息前可以做一些自定的处理,就是这个类

Java代码


/**

* message converter

* @description <p></p>

* @author quzishen

* @project NormandyPositionII

* @class MessageConvertForSys.java

* @version 1.0

* @time 2011-1-11

*/

public class MessageConvertForSys implements MessageConverter {

/* (non-Javadoc)

* @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session)

*/

public Message toMessage(Object object, Session session)

throws JMSException, MessageConversionException {

System.out.println("[toMessage]");

ObjectMessage objectMessage = session.createObjectMessage();

objectMessage.setJMSExpiration(1000);

objectMessage.setStringProperty("key1", object+"_add");

return objectMessage;

}

/* (non-Javadoc)

* @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message)

*/

public Object fromMessage(Message message) throws JMSException,

MessageConversionException {

System.out.println("[fromMessage]");

ObjectMessage objectMessage = (ObjectMessage) message;

return objectMessage.getObjectProperty("key1");

}

}

第一种,PTP方式的配置:

Java代码


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd "

default-autowire="byName">

<!-- JMS PTP MODEL -->

<!-- PTP链接工厂 -->

<bean id="queueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://127.0.0.1:61616" />

<!-- <property name="brokerURL" value="vm://normandy.notify" /> -->

<property name="useAsyncSend" value="true" />

</bean>

<!-- 定义消息队列 -->

<bean id="dest" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg value="queueDest" />

</bean>

<!-- PTP jms模板 -->

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="queueConnectionFactory"></property>

<property name="defaultDestination" ref="dest" />

<property name="messageConverter" ref="messageConvertForSys" />

<property name="pubSubDomain" value="false" />

</bean>

<!-- 消息转换器 -->

<bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys"></bean>

<!-- 消息发送方 -->

<bean id="messageSender" class="com.normandy.tech.test.MessageSender"></bean>

<!-- 消息接收方 -->

<bean id="messageReceiver" class="com.normandy.tech.test.MessageReceiver"></bean>

<!-- 消息监听容器 -->

<bean id="listenerContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="queueConnectionFactory" />

<property name="destination" ref="dest" />

<property name="messageListener" ref="messageReceiver" />

</bean>

</beans>

第二种:PUB/SUB方式的配置

我们配置两个消息订阅者,分别订阅不同的消息,这样用于判断是否成功执行了消息的发布和消息的订阅

Java代码


<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd "

default-autowire="byName">

<!-- JMS TOPIC MODEL -->

<!-- TOPIC链接工厂 -->

<bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://127.0.0.1:61616" />

<property name="useAsyncSend" value="true" />

</bean>

<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://127.0.0.1:61616" />

</bean>

<!-- 定义主题 -->

<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="normandy.topic"/>

</bean>

<bean id="myTopic2" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="normandy.topic2"/>

</bean>

<!-- 消息转换器 -->

<bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys"></bean>

<!-- TOPIC send jms模板 -->

<bean id="topicSendJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="topicSendConnectionFactory"></property>

<property name="defaultDestination" ref="myTopic" />

<property name="messageConverter" ref="messageConvertForSys" />

<!-- 开启订阅模式 -->

<property name="pubSubDomain" value="true"/>

</bean>

<!-- 消息发送方 -->

<bean id="topicMessageSender" class="com.normandy.tech.test.MessageSender">

<property name="jmsTemplate" ref="topicSendJmsTemplate"></property>

</bean>

<!-- 消息接收方 -->

<bean id="topicMessageReceiver" class="com.normandy.tech.test.MessageReceiver">

</bean>

<!-- 主题消息监听容器 -->

<bean id="listenerContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="topicListenConnectionFactory" />

<property name="pubSubDomain" value="true"/><!-- default is false -->

<property name="destination" ref="myTopic" /> <!-- listen topic: myTopic -->

<property name="subscriptionDurable" value="true"/>

<property name="clientId" value="clientId_001"/><!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉-->

<property name="messageListener" ref="topicMessageReceiver" />

</bean>

<!-- 主题消息监听容器2 -->

<bean id="listenerContainer2"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="topicListenConnectionFactory" />

<property name="pubSubDomain" value="true"/><!-- default is false -->

<property name="destination" ref="myTopic2" /> <!-- listen topic: myTopic2 -->

<property name="subscriptionDurable" value="true"/>

<property name="clientId" value="clientId_002"/>!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉-->

<property name="messageListener" ref="topicMessageReceiver" />

</bean>

</beans>

测试一下是否能发送和接收消息,我是在main方法里测试的

Java代码


public static void main(String[] args) throws HttpException, IOException {

System.out.println("初始化spring!准备开始接收!");

ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-jms-topic-receiver.xml");

MessageSender t=(MessageSender) context.getBean("topicMessageSender");

t.sendMessage();

}

Activemq支持两种消息传送模式:PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息)

从字面意思就可以了解,这是两种正好相反的模式。

1、PERSISTENT 持久消息

是activemq默认的传送方式,此方式下的消息在配合activemq.xml中配置的消息存储方式,会被存储在特定的地方,直到有消费者将消息消费或者消息过期进入DLQ队列,消息生命周期才会结束。

此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。

如果消息消费者在进行消费过程发生失败,则消息会被再次投递。

2、NON_PERSISTENT 非持久消息

非持久的消息适用于不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。

在spring提供的JmsTemplate中,同样提供了针对于当前功能的配置选项:

Xml代码


<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="cachingConnectionFactory"></property>

<property name="defaultDestination" ref="dest" />

<property name="messageConverter" ref="messageConverter" />

<property name="pubSubDomain" value="false" />

<property name="explicitQosEnabled" value="true" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false-->

<property name="deliveryMode" value="1" /> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->

</bean>

消息的签收模式:

客户端成功接收一条消息的标志是一条消息被签收,成功应答。

消息的签收情形分两种:

1、带事务的session

如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。

2、不带事务的session

不带事务的session的签收方式,取决于session的配置。

Activemq支持一下三种模式:

Session.AUTO_ACKNOWLEDGE 消息自动签收

Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收

Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。

spring提供的JmsTemplate中的配置方式:

Xml代码


<!-- PTP jms模板 -->

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="cachingConnectionFactory"></property>

<property name="defaultDestination" ref="dest" />

<property name="messageConverter" ref="messageConverter" />

<property name="pubSubDomain" value="false" />

<property name="sessionAcknowledgeMode" value="1" />

<!-- 消息应答方式

Session.AUTO_ACKNOWLEDGE 消息自动签收

Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收

Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送

-->

</bean>

以上资料来自:

http://blog.csdn.net/quzishen/article/details/6128781

http://blog.csdn.net/quzishen/article/details/6131222

如果你看到这里。基本已经掌握了 activeMQ的 基本用法

ps: 我在弄持久化的时候, 碰到一个小小的尴尬的地方。因为我用的 是 异步发送 信息. 当我向 activeMQ服务器发送信息的时候。

总是在我的 数据库里看不到我发送的信息记录。当时我就以为自己没有配好。。在网上又找了不少资料。发现可能是我已经

消费了信息,所以可能没有,这点和 quartz框架差不多。所以我就已发送信息,就关闭服务器,怎奈还是没有计算器快,还是没有信息。

后来我就自己写了个钩子程序,当一旦发送,我就关闭服务器,造成一个 down机 的假设环境。发现确实有用。呵呵。

JMS 才研究没有多久,但是感觉真的非常有用.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: