您的位置:首页 > 运维架构

ActiveMQ(二)———使用Topic来发送消息

2016-12-19 23:20 399 查看
摘要:每个消息可以有多个消费者,发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

引入三个jar包,这些jar包在activeMQ的安装包中都有:

geronimo-j2ee-management_1.1_spec-1.0.1.jar

geronimo-jms_1.1_spec-1.1.1.jar

activemq-core-5.5.0.jar

消息生产者:

package com.ds.test.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
private static final int SEND_NUMBER = 5;

public static void main(String[] args) {
Connection connection = null;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("FirstTopic1");
MessageProducer producer = session.createProducer(destination);
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}

public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
producer.send(message);
}
}
}


消息消费者:

package com.ds.test.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("FirstTopic1");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}catch (JMSException e) {
e.printStackTrace();
}
}
}


查看消息情况:http://localhost:8161/admin/topics.jsp
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: