http://my.oschina.net/u/264430/blog/57318
最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化;
真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,最终还是在一天半之后整出来鸟~~
首先向大家介绍一本书籍《ActiveMQ in Action》,我大部分代码都是参考这本书实现的。好了,废话少说,看代码:
1.首先启动activeMQ的服务
view sourceprint?01 | public class RunServer { |
04 | public static void main(String[] args) throws Exception { |
05 | RunServer rs = new RunServer(); |
06 | BrokerService broker = rs.startServer(); |
09 | public BrokerService startServer() throws Exception{ |
10 | // java代码调用activemq相关的类来构造并启动brokerService |
11 | BrokerService broker = new BrokerService(); |
15 | File dataFilterDir = new File( "targer/amq-in-action/kahadb" ); |
16 | KahaDBStore kaha = new KahaDBStore(); |
17 | kaha.setDirectory(dataFilterDir); |
18 | // use a bigger journal file |
19 | kaha.setJournalMaxFileLength( 1024 * 100 ); |
20 | // small batch means more frequent and smaller writes |
21 | kaha.setIndexWriteBatchSize( 100 ); |
22 | // do the index write in a separate thread |
23 | kaha.setEnableIndexWriteAsync( true ); |
25 | broker.setPersistenceAdapter(kaha); |
26 | // create a transport connector |
27 | broker.addConnector( "tcp://localhost:61616" ); |
28 | broker.setUseJmx( true ); |
29 | //broker.setDataDirectory("data/"); |
32 | // 以下是ManagementContext的配置,从这个容器中可以取得消息队列中未执行的消息数、消费者数、出队数等等 |
34 | ManagementContext context = broker.getManagementContext(); |
35 | context.setConnectorPort( 2011 ); |
36 | context.setJmxDomainName( "my-broker" ); |
37 | context.setConnectorPath( "/jmxrmi" ); |
2.发送消息
view sourceprint?02 | private static final int SEND_NUMBER = 1 ; |
04 | public st
20000
atic void main(String[] args){ |
05 | // ConnectionFactory :连接工厂,JMS 用它创建连接 |
06 | ConnectionFactory connectionFactory; |
07 | // Connection :JMS 客户端到JMS Provider 的连接 |
08 | Connection connection = null ; |
09 | // Session: 一个发送或接收消息的线程 |
11 | // Destination :消息的目的地;消息发送给谁. |
12 | Destination destination; |
13 | // MessageProducer:消息发送者 |
14 | MessageProducer producer; |
15 | // TextMessage message; |
16 | // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar |
18 | connectionFactory = new ActiveMQConnectionFactory( |
19 | ActiveMQConnection.DEFAULT_USER, |
20 | ActiveMQConnection.DEFAULT_PASSWORD, |
21 | "tcp://localhost:61616" ); |
24 | connection = connectionFactory.createConnection(); |
28 | session = connection.createSession(Boolean.TRUE, |
29 | Session.AUTO_ACKNOWLEDGE); |
30 | // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 |
31 | destination = session.createQueue( "test-persistence" ); |
33 | producer = session.createProducer(destination); |
35 | producer.setDeliveryMode(DeliveryMode.PERSISTENT); |
37 | sendMessage(session, producer); |
46 | } catch (Throwable ignore) { |
52 | public static void sendMessage(Session session, MessageProducer producer) |
54 | for ( int i = 1 ; i <= SEND_NUMBER; i++) { |
55 | TextMessage message = session |
56 | .createTextMessage( "ActiveMq 发送的消息" + i); |
58 | System.out.println( "发送消息:" + i); |
59 | producer.send(message); |
3.收消息
view sourceprint?01 | public class Receiver { |
02 | public static void main(String[] args){ |
03 | // ConnectionFactory :连接工厂,JMS 用它创建连接 |
04 | ConnectionFactory connectionFactory; |
05 | // Connection :JMS 客户端到JMS Provider 的连接 |
06 | Connection connection = null ; |
07 | // Session: 一个发送或接收消息的线程 |
09 | // Destination :消息的目的地;消息发送给谁. |
10 | Destination destination; |
12 | MessageConsumer consumer; |
14 | connectionFactory = new ActiveMQConnectionFactory( |
15 | ActiveMQConnection.DEFAULT_USER, |
16 | ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616" ); |
19 | connection = connectionFactory.createConnection(); |
23 | session = connection.createSession(Boolean.TRUE, |
24 | Session.AUTO_ACKNOWLEDGE); |
25 | //test-queue跟sender的保持一致,一个创建一个来接收 |
26 | destination = session.createQueue( "test-persistence" ); |
27 | consumer = session.createConsumer(destination); |
29 | consumer.setMessageListener( new MessageListener() { |
30 | public void onMessage(Message arg0) { |
33 | } catch (InterruptedException e1) { |
36 | System.out.println( "==================" ); |
38 | System.out.println( "RECEIVE1第一个获得者:" |
39 | + ((TextMessage) arg0).getText()); |
40 | } catch (JMSException e) { |
41 | // TODO Auto-generated catch block |
54 | } catch (Throwable ignore) { |
4.获取消息的状态,也就是上面所说的获得消息队列中未执行的消息数、消费者数、出队数等等
view sourceprint?01 | public class StateTest { |
07 | public static void main(String[] args) throws Exception { |
08 | JMXServiceURL url = new JMXServiceURL( "service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi" ); |
09 | JMXConnector connector = JMXConnectorFactory.connect(url, null ); |
11 | MBeanServerConnection connection = connector.getMBeanServerConnection(); |
13 | // 需要注意的是,这里的my-broker必须和上面配置的名称相同 |
14 | ObjectName name = new ObjectName( "my-broker:BrokerName=localhost,Type=Broker" ); |
15 | BrokerViewMBean mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean. class , true ); |
16 | // System.out.println(mBean.getBrokerName()); |
18 | for (ObjectName queueName : mBean.getQueues()) { |
19 | QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean. class , true ); |
20 | System.out.println( "\n------------------------------\n" ); |
23 | System.out.println( "States for queue --- " + queueMBean.getName()); |
26 | System.out.println( "Size --- " + queueMBean.getQueueSize()); |
29 | System.out.println( "Number of consumers --- " + queueMBean.getConsumerCount()); |
32 | System.out.println( "Number of dequeue ---" + queueMBean.getDequeueCount() ); |
到此结束,希望可以为大家做个参考~~
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow