ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
2012-05-16 00:14
721 查看
http://my.oschina.net/u/264430/blog/57318
最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化;
真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,最终还是在一天半之后整出来鸟~~
首先向大家介绍一本书籍《ActiveMQ in Action》,我大部分代码都是参考这本书实现的。好了,废话少说,看代码:
1.首先启动activeMQ的服务
view source
print?
2.发送消息
view source
print?
3.收消息
view source
print?
4.获取消息的状态,也就是上面所说的获得消息队列中未执行的消息数、消费者数、出队数等等
view source
print?
到此结束,希望可以为大家做个参考~~
最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化;
真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,最终还是在一天半之后整出来鸟~~
首先向大家介绍一本书籍《ActiveMQ in Action》,我大部分代码都是参考这本书实现的。好了,废话少说,看代码:
1.首先启动activeMQ的服务
view source
print?
01 | public class RunServer { |
02 |
03 | /** 启动activeMQ服务 */ |
04 | public static void main(String[] args) throws Exception { |
05 | RunServer rs = new RunServer(); |
06 | BrokerService broker = rs.startServer(); |
07 | } |
08 |
09 | public BrokerService startServer() throws Exception{ |
10 | // java代码调用activemq相关的类来构造并启动brokerService |
11 | BrokerService broker = new BrokerService(); |
12 |
13 | // 以下是持久化的配置 |
14 | // 持久化文件存储位置 |
15 | File dataFilterDir = new File( "targer/amq-in-action/kahadb" ); |
16 | KahaDBStore kaha = new KahaDBStore(); |
17 | kaha.setDirectory(dataFilterDir); |
18 | // use a bigger journal file |
19 | kaha.setJournalMaxFileLength( 1024 * 100 ); |
20 | // small batch means more frequent and smaller writes |
21 | kaha.setIndexWriteBatchSize( 100 ); |
22 | // do the index write in a separate thread |
23 | kaha.setEnableIndexWriteAsync( true ); |
24 |
25 | broker.setPersistenceAdapter(kaha); |
26 | // create a transport connector |
27 | broker.addConnector( "tcp://localhost:61616" ); |
28 | broker.setUseJmx( true ); |
29 | //broker.setDataDirectory("data/"); |
30 |
31 |
32 | // 以下是ManagementContext的配置,从这个容器中可以取得消息队列中未执行的消息数、消费者数、出队数等等 |
33 | // 设置ManagementContext |
34 | ManagementContext context = broker.getManagementContext(); |
35 | context.setConnectorPort( 2011 ); |
36 | context.setJmxDomainName( "my-broker" ); |
37 | context.setConnectorPath( "/jmxrmi" ); |
38 | broker.start(); |
39 | System.in.read(); |
40 | return broker; |
41 | } |
view source
print?
01 | public class Sender { |
02 | private static final int SEND_NUMBER = 1 ; |
03 |
04 | public static void main(String[] args){ |
05 | // ConnectionFactory :连接工厂,JMS 用它创建连接 |
06 | ConnectionFactory connectionFactory; |
07 | // Connection :JMS 客户端到JMS Provider 的连接 |
08 | Connection connection = null ; |
09 | // Session: 一个发送或接收消息的线程 |
10 | Session session; |
11 | // Destination :消息的目的地;消息发送给谁. |
12 | Destination destination; |
13 | // MessageProducer:消息发送者 |
14 | MessageProducer producer; |
15 | // TextMessage message; |
16 | // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar |
17 |
18 | connectionFactory = new ActiveMQConnectionFactory( |
19 | ActiveMQConnection.DEFAULT_USER, |
20 | ActiveMQConnection.DEFAULT_PASSWORD, |
21 | "tcp://localhost:61616" ); |
22 | try { |
23 | // 构造从工厂得到连接对象 |
24 | connection = connectionFactory.createConnection(); |
25 | // 启动 |
26 | connection.start(); |
27 | // 获取操作连接 |
28 | session = connection.createSession(Boolean.TRUE, |
29 | Session.AUTO_ACKNOWLEDGE); |
30 | // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 |
31 | destination = session.createQueue( "test-persistence" ); |
32 | // 得到消息生成者【发送者】 |
33 | producer = session.createProducer(destination); |
34 | // 设置不持久化,可以更改 |
35 | producer.setDeliveryMode(DeliveryMode.PERSISTENT); |
36 | // 构造消息 |
37 | sendMessage(session, producer); |
38 | session.commit(); |
39 |
40 | } catch (Exception e) { |
41 | e.printStackTrace(); |
42 | } finally { |
43 | try { |
44 | if ( null != connection) |
45 | connection.close(); |
46 | } catch (Throwable ignore) { |
47 | } |
48 | } |
49 |
50 | } |
51 |
52 | public static void sendMessage(Session session, MessageProducer producer) |
53 | throws Exception { |
54 | for ( int i = 1 ; i <= SEND_NUMBER; i++) { |
55 | TextMessage message = session |
56 | .createTextMessage( "ActiveMq 发送的消息" + i); |
57 | // 发送消息到目的地方 |
58 | System.out.println( "发送消息:" + i); |
59 | producer.send(message); |
60 | } |
61 | } |
view source
print?
01 | public class Receiver { |
02 | public static void main(String[] args){ |
03 | // ConnectionFactory :连接工厂,JMS 用它创建连接 |
04 | ConnectionFactory connectionFactory; |
05 | // Connection :JMS 客户端到JMS Provider 的连接 |
06 | Connection connection = null ; |
07 | // Session: 一个发送或接收消息的线程 |
08 | Session session; |
09 | // Destination :消息的目的地;消息发送给谁. |
10 | Destination destination; |
11 | // 消费者,消息接收者 |
12 | MessageConsumer consumer; |
13 |
14 | connectionFactory = new ActiveMQConnectionFactory( |
15 | ActiveMQConnection.DEFAULT_USER, |
16 | ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616" ); |
17 | try { |
18 | // 构造从工厂得到连接对象 |
19 | connection = connectionFactory.createConnection(); |
20 | // 启动 |
21 | connection.start(); |
22 | // 获取操作连接 |
23 | session = connection.createSession(Boolean.TRUE, |
24 | Session.AUTO_ACKNOWLEDGE); |
25 | //test-queue跟sender的保持一致,一个创建一个来接收 |
26 | destination = session.createQueue( "test-persistence" ); |
27 | consumer = session.createConsumer(destination); |
28 |
29 | consumer.setMessageListener( new MessageListener() { |
30 | public void onMessage(Message arg0) { |
31 | try { |
32 | Thread.sleep( 1000 ); |
33 | } catch (InterruptedException e1) { |
34 | e1.printStackTrace(); |
35 | } |
36 | System.out.println( "==================" ); |
37 | try { |
38 | System.out.println( "RECEIVE1第一个获得者:" |
39 | + ((TextMessage) arg0).getText()); |
40 | } catch (JMSException e) { |
41 | // TODO Auto-generated catch block |
42 | e.printStackTrace(); |
43 | } |
44 |
45 | } |
46 | }); |
47 | } catch (Exception e) { |
48 | e.printStackTrace(); |
49 | } |
50 | finally { |
51 | try { |
52 | if ( null != connection) |
53 | connection.close(); |
54 | } catch (Throwable ignore) { |
55 | } |
56 | } |
57 | } |
58 | } |
view source
print?
01 | public class StateTest { |
02 |
03 | /** |
04 | * 获取状态 |
05 | * @throws Exception |
06 | */ |
07 | public static void main(String[] args) throws Exception { |
08 | JMXServiceURL url = new JMXServiceURL( "service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi" ); |
09 | JMXConnector connector = JMXConnectorFactory.connect(url, null ); |
10 | connector.connect(); |
11 | MBeanServerConnection connection = connector.getMBeanServerConnection(); |
12 |
13 | // 需要注意的是,这里的my-broker必须和上面配置的名称相同 |
14 | ObjectName name = new ObjectName( "my-broker:BrokerName=localhost,Type=Broker" ); |
15 | BrokerViewMBean mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean. class , true ); |
16 | // System.out.println(mBean.getBrokerName()); |
17 |
18 | for (ObjectName queueName : mBean.getQueues()) { |
19 | QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean. class , true ); |
20 | System.out.println( "\n------------------------------\n" ); |
21 |
22 | // 消息队列名称 |
23 | System.out.println( "States for queue --- " + queueMBean.getName()); |
24 |
25 | // 队列中剩余的消息数 |
26 | System.out.println( "Size --- " + queueMBean.getQueueSize()); |
27 |
28 | // 消费者数 |
29 | System.out.println( "Number of consumers --- " + queueMBean.getConsumerCount()); |
30 |
31 | // 出队数 |
32 | System.out.println( "Number of dequeue ---" + queueMBean.getDequeueCount() ); |
33 | } |
34 |
35 | } |
36 |
37 | } |
相关文章推荐
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- Java ActiveMQ简介以及springboot集成activeMQ实现消息队列监听以及实现MQ延迟
- java通过ActiveMQ实现JMS的消息队列实例
- PHP中使用ActiveMQ实现消息队列
- [置顶] spring boot 使用activeMQ实现消息队列简单应用
- activemq 支持mysql持久化 消息队列使用
- MSMQ?不,太弱了。使用ActiveMQ实现消息队列服务
- ActiveMQ实现延迟消息队列
- Activemq 消息发送、接收java代码实现队列模式
- activemq读取剩余消息队列中消息的数量
- ActiveMQ实现消息队列
- 深入浅出JMS(六)--ActiveMQ实现消息队列发送邮件
- 消息队列入门(五)ActiveMQ的JDBC消息持久化机制
- SpringMVC4整合ActiveMQ实现消息队列
- ActiveMQ的消息持久化---JDBC的实现方式
- ActiveMQ实现消息队列发送邮件
- activeMq消息队列查询统计