ActiveMQ 快速入门教程系列 第二章 发布-订阅者模式实现
2016-02-07 17:15
691 查看
第二章我们会介绍怎样实现一个发布者对多个订阅者的消息传递
Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。
首先我们如第一章类似,为订阅者1,2分别创建两个监听器MyMessageListener,MyMessageListener2 实现MessageListener接口
然后我们分别创建订阅者1,2,并创建一个Topic “MyTopic1”用于消息的订阅,订阅者1,2分别设置对应的监听器
接着我们创建发布者,发布10条消息
我们打开activeMq 控制台可以看到有2位消费者,消息被消费了20次(每位订阅者分别消费了10条消息)
Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。
首先我们如第一章类似,为订阅者1,2分别创建两个监听器MyMessageListener,MyMessageListener2 实现MessageListener接口
package cn.com.evan.Jms.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyMessageListener implements MessageListener{ @Override public void onMessage(Message msg) { try { System.out.println("订阅者1接受消息:"+((TextMessage)msg).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package cn.com.evan.Jms.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyMessageListener2 implements MessageListener{ @Override public void onMessage(Message msg) { try { System.out.println("订阅者2接受消息:"+((TextMessage)msg).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
然后我们分别创建订阅者1,2,并创建一个Topic “MyTopic1”用于消息的订阅,订阅者1,2分别设置对应的监听器
package cn.com.evan.Jms.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsComsumer { private static String USERNAME = ActiveMQConnection.DEFAULT_USER; private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static Integer SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer messageConsumer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("MyTopic1");//create topic messageConsumer = session.createConsumer(destination);// Create // producer messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
package cn.com.evan.Jms.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsComsumer2 { private static String USERNAME = ActiveMQConnection.DEFAULT_USER; private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static Integer SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageConsumer messageConsumer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("MyTopic1");//create topic messageConsumer = session.createConsumer(destination);// Create // producer messageConsumer.setMessageListener(new MyMessageListener2());// set listener which we created before } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
接着我们创建发布者,发布10条消息
package cn.com.evan.Jms.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JmsProducer { private static String USERNAME = ActiveMQConnection.DEFAULT_USER; private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static Integer SENDNUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer messageProducer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKERURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("MyTopic1");// Create topic messageProducer = session.createProducer(destination);// Create producer sendMessage(session,messageProducer); session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{ for(int i=0;i<JmsProducer.SENDNUM;i++){ TextMessage message = session.createTextMessage("ActiveMQ"+i); messageProducer.send(message); System.out.println("发布者发布消息:"+message.getText()); } } }以上发布者和2个订阅者都已经创建完毕,我们先运行订阅者1,2,再运行发布者(就像我们平时要接收到什么新闻,都得先订阅那频道,才可以接收到相关频道发布的消息),运行结果如下
我们打开activeMq 控制台可以看到有2位消费者,消息被消费了20次(每位订阅者分别消费了10条消息)
相关文章推荐
- Onenote如何快速实现首行缩进的功能。
- ajax优点
- 利用简洁的C语言代码解决跳台阶问题与约瑟夫环问题
- 用努力的程度拼别人聪明的程度。
- 机房重构——DataGridView数据显示,窗体显示
- Learn what you can do with Spring Boot
- bzoj1051【HAOI2006】受欢迎的牛
- 错排问题
- HDU-1014
- 自己动手写一个 WordPress 插件
- What is a JavaBean exactly?
- hdoj 1385 Minimum Transport Cost【最短路-->floyd】
- 2015-2016寒假 第一、二周学习总结
- Pure-Highlightjs – WordPress 代码高亮插件
- 新浪云PHP(续)
- poj2778 DNA Sequence(AC自动机+矩阵快速幂)
- C++单例(用static实现)
- 详解C语言编程中预处理器的用法
- mac zsh选择到行首的快捷键
- ios 崩溃信息获取代码