您的位置:首页 > 数据库 > MySQL

activemq 支持mysql持久化 消息队列使用

2011-10-17 15:04 316 查看
目前官网上的5.x.x版本都不能下载,故使用4.1.2版本进行测试
下载地址

下载下来的包解压后为绿色版的,除了配置java_home环境变量,其他不需要安装,可以直接启动。

启动命令: sudo ./apache-activemq-path/bin/acticemq

修改配置文件支持对mysql数据库的支持,打开/apache-activemq-path/conf/activemq.xml

配置为如下

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at
 http://www.apache.org/licenses/LICENSE-2.0 
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans>

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

<broker brokerName="localhost" useJmx="true" xmlns="http://activemq.org/config/1.0">

<!--  Use the following to set the broker memory limit
<memoryManager>
<usageManager id="memory-manager" limit="20 MB"/>
</memoryManager>
-->

<!-- Use the following to configure how ActiveMQ is exposed in JMX
<managementContext>
<managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>
</managementContext>
-->

<!-- In ActiveMQ 4, you can setup destination policies -->
<destinationPolicy>
<policyMap><policyEntries>

<policyEntry topic="FOO.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>

</policyEntries></policyMap>
</destinationPolicy>

<persistenceAdapter>
<!--
<journaledJDBC journalLogFiles="5" dataDirectory="${activemq.base}/activemq-data"/>
-->
<!-- To use a different datasource, use the following syntax : -->

<jdbcPersistenceAdapter   dataSource="#mysql-ds"/>

</persistenceAdapter>

<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl"     uri="ssl://localhost:61617"/>
<transportConnector name="stomp"   uri="stomp://localhost:61613"/>
</transportConnectors>

<networkConnectors>
<!-- by default just auto discover the other brokers -->
<networkConnector name="default-nc" uri="multicast://default"/>
<!--
<networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)" failover="true"/>
-->
</networkConnectors>

</broker>

<!--  This xbean configuration file supports all the standard spring xml configuration options -->

<!-- Postgres DataSource Sample Setup -->
<!--
<bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
<property name="serverName" value="localhost"/>
<property name="databaseName" value="activemq"/>
<property name="portNumber" value="0"/>
<property name="user" value="activemq"/>
<property name="password" value="activemq"/>
<property name="dataSourceName" value="postgres"/>
<property name="initialConnections" value="1"/>
<property name="maxConnections" value="10"/>
</bean>
-->

<!-- MySql DataSource Sample Setup -->

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activedata?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="bobo1314"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

<!-- Oracle DataSource Sample Setup -->
<!--
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
<property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
<property name="username" value="scott"/>
<property name="password" value="tiger"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
-->

<!-- Embedded Derby DataSource Sample Setup -->
<!--
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
-->

</beans>
<!-- END SNIPPET: example -->


根据自己的数据库要素,修改配置文件中数据库连接部分

注意:这里一定要用这种模式 jdbcPersistenceAdapter ,这样持久化时跟数据库同步,不过会影响性能,但对于需要双击热备的系统,使用mysql主从模式,只好这样配置,防止数据丢失

重启下,开始使用代码测试队列,当然,需要把/apache-activemq-path/apache-activemq-4.1.2.jar先引入环境中

入列部分

ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// Destination :消息的目的地;消息发送给谁.
Destination destination;

connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
// 启动
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session0的queue
destination = session.createQueue("villasy-queue");
//消息生产者
MessageProducer msgProducer = session.createProducer(destination);
//设置持久化
msgProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage txtMsg = session.createTextMessage("发送消息:"+i);
msgProducer.send(txtMsg);
//session.commit();   //可要可不用
}
}

} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
读取队列内容

ConnectionFactory connectionFactory;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
Session session;
// 消费者,消息接收者

// Destination :消息的目的地;消息发送给谁.
Destination destination;

MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection(); // 启动
connection.start(); // 获取操作连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session0的queue
destination = session.createQueue("villasy-queue");
consumer = session.createConsumer(destination);

TextMessage message = (TextMessage) consumer.receive(1000);  //这里的1000表示读取延迟1秒
if (null != message) {
System.out.println("收到消息" + message.getText());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: