【ActiveMQ教程】发布/订阅(Publish/Subscribe)消息教程
2015-12-27 00:00
696 查看
摘要: 发布/订阅(Publish/Subscribe)消息教程
先订阅后发布
发布/订阅(Publish/Subscribe)消息教程
要先订阅后发布,可以一个发布,多个订阅。
先订阅后发布
发布/订阅(Publish/Subscribe)消息教程
要先订阅后发布,可以一个发布,多个订阅。
创建消息发布者
/** * 发布/订阅-消息发布者 * * @author Edward * */ public class P2sProducer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection conn = null; Session session = null; Topic topic = null; MessageProducer messageProducer = null; try { // 创建工厂 // ActiveMQConnection.DEFAULT_USER 默认null // ActiveMQConnection.DEFAULT_PASSWORD 默认null // ActiveMQConnection.DEFAULT_BROKER_URL // 默认failover://tcp://localhost:61616 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 创建连接 conn = connectionFactory.createConnection(); // 启动连接 conn.start(); // 创建会话 createSession(true, Session.AUTO_ACKNOWLEDGE); true 表示开启事务 // Session.AUTO_ACKNOWLEDGE 消息模式 session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建Topic topic = session.createTopic("P2sTopic"); // 创建消息生产者 messageProducer = session.createProducer(topic); // 创建消息 TextMessage message = session.createTextMessage(); message.setText("我是P2s发布者发布的消息"); // 发送消息 messageProducer.send(message); // 提交事务 session.commit(); System.out.println("OK"); } catch (JMSException e) { e.printStackTrace(); } finally { try { session.close(); conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
创建消息订阅者
/** * 发布/订阅-消息订阅者 * * @author Edward * */ public class P2sConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection conn = null; Session session = null; Topic topic = null; MessageConsumer messageConsumer = null; try { // 创建工厂 // ActiveMQConnection.DEFAULT_USER 默认null // ActiveMQConnection.DEFAULT_PASSWORD 默认null // ActiveMQConnection.DEFAULT_BROKER_URL // 默认failover://tcp://localhost:61616 connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 创建连接 conn = connectionFactory.createConnection(); // 启动连接 conn.start(); // 创建会话 createSession(true, Session.AUTO_ACKNOWLEDGE); false 表示不开启事务 // Session.AUTO_ACKNOWLEDGE 消息模式 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建Topic topic = session.createTopic("P2sTopic"); // 创建消息消费者 messageConsumer = session.createConsumer(topic); // 注册消费消息监听 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("订阅者1-订阅的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } }
执行结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 订阅者1-订阅的消息:我是P2s发布者发布的消息
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树