activemq 支持mysql持久化 消息队列使用
2011-10-17 15:04
316 查看
目前官网上的5.x.x版本都不能下载,故使用4.1.2版本进行测试
下载地址
下载下来的包解压后为绿色版的,除了配置java_home环境变量,其他不需要安装,可以直接启动。
启动命令: sudo ./apache-activemq-path/bin/acticemq
修改配置文件支持对mysql数据库的支持,打开/apache-activemq-path/conf/activemq.xml
配置为如下
根据自己的数据库要素,修改配置文件中数据库连接部分
注意:这里一定要用这种模式 jdbcPersistenceAdapter ,这样持久化时跟数据库同步,不过会影响性能,但对于需要双击热备的系统,使用mysql主从模式,只好这样配置,防止数据丢失
重启下,开始使用代码测试队列,当然,需要把/apache-activemq-path/apache-activemq-4.1.2.jar先引入环境中
入列部分
下载地址
下载下来的包解压后为绿色版的,除了配置java_home环境变量,其他不需要安装,可以直接启动。
启动命令: sudo ./apache-activemq-path/bin/acticemq
修改配置文件支持对mysql数据库的支持,打开/apache-activemq-path/conf/activemq.xml
配置为如下
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: example --> <beans> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> <broker brokerName="localhost" useJmx="true" xmlns="http://activemq.org/config/1.0"> <!-- Use the following to set the broker memory limit <memoryManager> <usageManager id="memory-manager" limit="20 MB"/> </memoryManager> --> <!-- Use the following to configure how ActiveMQ is exposed in JMX <managementContext> <managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/> </managementContext> --> <!-- In ActiveMQ 4, you can setup destination policies --> <destinationPolicy> <policyMap><policyEntries> <policyEntry topic="FOO.>"> <dispatchPolicy> <strictOrderDispatchPolicy /> </dispatchPolicy> <subscriptionRecoveryPolicy> <lastImageSubscriptionRecoveryPolicy /> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries></policyMap> </destinationPolicy> <persistenceAdapter> <!-- <journaledJDBC journalLogFiles="5" dataDirectory="${activemq.base}/activemq-data"/> --> <!-- To use a different datasource, use the following syntax : --> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> <transportConnector name="ssl" uri="ssl://localhost:61617"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> </transportConnectors> <networkConnectors> <!-- by default just auto discover the other brokers --> <networkConnector name="default-nc" uri="multicast://default"/> <!-- <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)" failover="true"/> --> </networkConnectors> </broker> <!-- This xbean configuration file supports all the standard spring xml configuration options --> <!-- Postgres DataSource Sample Setup --> <!-- <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource"> <property name="serverName" value="localhost"/> <property name="databaseName" value="activemq"/> <property name="portNumber" value="0"/> <property name="user" value="activemq"/> <property name="password" value="activemq"/> <property name="dataSourceName" value="postgres"/> <property name="initialConnections" value="1"/> <property name="maxConnections" value="10"/> </bean> --> <!-- MySql DataSource Sample Setup --> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/activedata?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="bobo1314"/> <property name="poolPreparedStatements" value="true"/> </bean> <!-- Oracle DataSource Sample Setup --> <!-- <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/> <property name="username" value="scott"/> <property name="password" value="tiger"/> <property name="poolPreparedStatements" value="true"/> </bean> --> <!-- Embedded Derby DataSource Sample Setup --> <!-- <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> --> </beans> <!-- END SNIPPET: example -->
根据自己的数据库要素,修改配置文件中数据库连接部分
注意:这里一定要用这种模式 jdbcPersistenceAdapter ,这样持久化时跟数据库同步,不过会影响性能,但对于需要双击热备的系统,使用mysql主从模式,只好这样配置,防止数据丢失
重启下,开始使用代码测试队列,当然,需要把/apache-activemq-path/apache-activemq-4.1.2.jar先引入环境中
入列部分
ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session0的queue destination = session.createQueue("villasy-queue"); //消息生产者 MessageProducer msgProducer = session.createProducer(destination); //设置持久化 msgProducer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage txtMsg = session.createTextMessage("发送消息:"+i); msgProducer.send(txtMsg); //session.commit(); //可要可不用 } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } }读取队列内容
ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // 消费者,消息接收者 // Destination :消息的目的地;消息发送给谁. Destination destination; MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session0的queue destination = session.createQueue("villasy-queue"); consumer = session.createConsumer(destination); TextMessage message = (TextMessage) consumer.receive(1000); //这里的1000表示读取延迟1秒 if (null != message) { System.out.println("收到消息" + message.getText()); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } }
相关文章推荐
- ActiveMQ使用MySQL进行消息持久化的那些坑
- ActiveMQ(5.10.0) - 使用 JDBC 持久化消息
- Mysql对空间数据库的支持及使用Hibernate Spatial对空间数据的持久化操作
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- 【ActiveMQ】持久化消息队列的三种方式
- ActiveMQ使用笔记(二)ActiveMQ消息持久化(1)
- ActiveMQ消息队列的使用及应用
- 关于消息队列的使用----ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- ActiveMQ消息队列的使用及应用
- 开源消息队列ActiveMQ使用 .net window
- PHP使用MySQL实现消息队列
- ActiveMQ消息队列的使用及应用
- 消息队列入门(五)ActiveMQ的JDBC消息持久化机制
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- PHP中使用ActiveMQ实现消息队列
- ActiveMQ消息队列的单机使用及应用(一)
- ActiveMQ消息队列的使用及应用
- 消息队列篇—详谈ActiveMQ消息队列模式的分析及使用