您的位置:首页 > 编程语言 > Java开发

【ActiveMQ教程】发布/订阅(Publish/Subscribe)消息教程

2015-12-27 00:00 696 查看
摘要: 发布/订阅(Publish/Subscribe)消息教程
先订阅后发布

发布/订阅(Publish/Subscribe)消息教程

要先订阅后发布,可以一个发布,多个订阅。

创建消息发布者

/**
* 发布/订阅-消息发布者
*
* @author Edward
*
*/
public class P2sProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection conn = null;
Session session = null;
Topic topic = null;
MessageProducer messageProducer = null;
try {
// 创建工厂
// ActiveMQConnection.DEFAULT_USER 默认null
// ActiveMQConnection.DEFAULT_PASSWORD 默认null
// ActiveMQConnection.DEFAULT_BROKER_URL
// 默认failover://tcp://localhost:61616
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL);
// 创建连接
conn = connectionFactory.createConnection();
// 启动连接
conn.start();
// 创建会话 createSession(true, Session.AUTO_ACKNOWLEDGE); true 表示开启事务
// Session.AUTO_ACKNOWLEDGE 消息模式
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建Topic
topic = session.createTopic("P2sTopic");
// 创建消息生产者
messageProducer = session.createProducer(topic);
// 创建消息
TextMessage message = session.createTextMessage();
message.setText("我是P2s发布者发布的消息");
// 发送消息
messageProducer.send(message);
// 提交事务
session.commit();
System.out.println("OK");
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}

}

}


创建消息订阅者

/**
* 发布/订阅-消息订阅者
*
* @author Edward
*
*/
public class P2sConsumer {

public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection conn = null;
Session session = null;
Topic topic = null;
MessageConsumer messageConsumer = null;
try {
// 创建工厂
// ActiveMQConnection.DEFAULT_USER 默认null
// ActiveMQConnection.DEFAULT_PASSWORD 默认null
// ActiveMQConnection.DEFAULT_BROKER_URL
// 默认failover://tcp://localhost:61616
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL);
// 创建连接
conn = connectionFactory.createConnection();
// 启动连接
conn.start();
// 创建会话 createSession(true, Session.AUTO_ACKNOWLEDGE); false 表示不开启事务
// Session.AUTO_ACKNOWLEDGE 消息模式
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建Topic
topic = session.createTopic("P2sTopic");
// 创建消息消费者
messageConsumer = session.createConsumer(topic);
// 注册消费消息监听
messageConsumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
try {
System.out.println("订阅者1-订阅的消息:"
+ ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}


执行结果

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
订阅者1-订阅的消息:我是P2s发布者发布的消息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java JMS ActiveMQ