ActiveMQ入门教程(四) - ActiveMQ Pub/Sub版的HelloWorld
2013-09-28 14:46
363 查看
在上一篇中,我们说了P2P版的HelloWorld,在这一篇,我们简要说一下,基于发布,订阅模式的HelloWorld。
基础知识就不在介绍了,需要的会一点一点讲。
消费者:
单击那个Topics连接。
这里显示的是服务器上的主题,这些显示的都没有用,可以都删掉。
Name:主题的名称
Number Of Consumers:正在运行的消费者
Message Enqueued:进入消息队列的
Message Dequeued:出消息队列的
Operations:操作
下面就可以开始运行程序了,
注意顺序:先运行消费者:
这里会产生好几个主题,我们只看我们自己用的那个,(其实,其他几个是干嘛的,暂时还不清楚,以后再研究吧.....)
我们的消费者一直在运行
接下来,运行生产者:
控制台会输出:
再一次,刷新界面:
消费者还在运行,只生产了一条消息,而且已经被消费了。
在这里遇到了一个问题,就是运行顺序的问题,
我们如果先运行生产者,再运行消费者,消费者是接收不到消息的,郁闷了好久
猜想,应该是对概念,规范的理解出了问题,就找了一下,发现了原因:
这是上一篇介绍的JMS消息模型,哎,
,对概念的理解不清晰。
至于,持久的订阅,会在以后的博客中分享,HelloWorld,就到此结束了。
基础知识就不在介绍了,需要的会一点一点讲。
1. pom.xml
这个和上一篇是一样的:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.ygy</groupId> <artifactId>activemq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>activemq</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <!-- activemq,学习中 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.5.6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.5.6</version> </dependency> </dependencies> </project>
2. Pub/Sub版的HelloWorld
生产者:package org.ygy.mq.lesson01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.ygy.mq.constants.MQConstants; public class HelloTopicProducer { public void send(String msg) { // 生产者的主要流程 Connection connection = null; try { // 1.初始化connection工厂,使用默认的URL // failover://tcp://localhost:61616 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 2.创建Connection connection = connectionFactory.createConnection(); // 3.打开连接 connection.start(); // 4.创建Session,(是否支持事务) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建消息目标 Destination destination_send = session.createTopic(MQConstants.DESTINATION_SEND); // 6.创建生产者 MessageProducer producer = session.createProducer(destination_send); // 7.配置消息是否持久化 /* * DeliverMode有2种方式: * * public interface DeliveryMode { static final int NON_PERSISTENT = * 1;//不持久化:服务器重启之后,消息销毁 * * static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在 } */ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 8.初始化要发送的消息 TextMessage message = session.createTextMessage(msg); // 9.发送消息 producer.send(message); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { new HelloTopicProducer().send("我来试一试发布/订阅..."); } }
消费者:
package org.ygy.mq.lesson01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.ygy.mq.constants.MQConstants; public class HelloTopicConsumer implements MessageListener { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; try { System.out.println("哈,我接收到了消息:" + txtMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public void receive() { // 消费者的主要流程 Connection connection = null; try { // 1.初始化connection工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 2.创建Connection connection = connectionFactory.createConnection(); // 3.打开连接 connection.start(); // 4.创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.创建消息目标 Destination destination = session.createTopic(MQConstants.DESTINATION_SEND); // 6.创建消费者 MessageConsumer consumer = session.createConsumer(destination); // 7.配置监听 consumer.setMessageListener(new HelloTopicConsumer()); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { new HelloTopicConsumer().receive(); } }
3.测试
访问网页:http://localhost:8161/admin/topics.jsp单击那个Topics连接。
这里显示的是服务器上的主题,这些显示的都没有用,可以都删掉。
Name:主题的名称
Number Of Consumers:正在运行的消费者
Message Enqueued:进入消息队列的
Message Dequeued:出消息队列的
Operations:操作
下面就可以开始运行程序了,
注意顺序:先运行消费者:
这里会产生好几个主题,我们只看我们自己用的那个,(其实,其他几个是干嘛的,暂时还不清楚,以后再研究吧.....)
我们的消费者一直在运行
接下来,运行生产者:
控制台会输出:
再一次,刷新界面:
消费者还在运行,只生产了一条消息,而且已经被消费了。
在这里遇到了一个问题,就是运行顺序的问题,
我们如果先运行生产者,再运行消费者,消费者是接收不到消息的,郁闷了好久
猜想,应该是对概念,规范的理解出了问题,就找了一下,发现了原因:
这是上一篇介绍的JMS消息模型,哎,
,对概念的理解不清晰。
至于,持久的订阅,会在以后的博客中分享,HelloWorld,就到此结束了。
相关文章推荐
- ActiveMQ入门教程(四) - ActiveMQ Pub/Sub版的HelloWorld
- ActiveMQ入门教程(三) - ActiveMQ P2P版的HelloWorld
- ActiveMQ入门教程(三) - ActiveMQ P2P版的HelloWorld
- ActiveMQ入门教程(二) - ActiveMQ的安装
- Android轻量级ORM框架ActiveAndroid入门教程
- ActiveMQ 入门helloworld
- Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用
- andorid jni入门教程一之helloworld
- Understanding ActiveMQ Broker Networks - 理解ActiveMQ 网络模型
- Cocos2d-x入门教程(一)环境搭建与HelloWorld
- ActiveMQ + NodeJS + Stomp 极简入门
- Bootstrap简单HelloWorld入门教程
- Apache activemq入门实例
- ActiveMQ入门教程(六) - ActiveMQ与Spring整合-MessageListener
- bc94 老司机学习MyBatis教程之MyBatis基础篇简单入门HelloWorld
- Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用
- [PHP]从HelloWorld开始的基本语法入门教程
- ActiveMQ SSL应用之五 Spring+ActiveMQ开启SSL连接
- ActiveMQ入门教程(五) - ActiveMQ与Spring整合
- 5、ActiveMQ入门教程(五)--Spring和ActiveMQ整合