ActiveMQ (二) 使用Queue或者Topic发送/接受消息
2016-07-29 00:00
411 查看
ActiveMQ (
二) 使用Queue或者Topic发送/接受消息
本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。
一.JMS回顾
Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS 1.0.2 b和1.1. 1.1已经成为主流的JMS Provider事实上的标准了.
1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受
Topic消息。
在JMS中间主要定义了2种消息模式Point-to-Point (点对点), Publich/Subscribe Model (发布/订阅者), 其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式。
下面是JMS规范基本的接口和实现
JMS Common Interface PTP-Specific Interface Pub/Sub-specific interfaces
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopiSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver/QueueBrwer TopicSubscriber
二.使用Queue
下面以ActiveMQ example的代码为主进行说明
使用ActiveMQ的Connection,ConnectionFactory 建立连接,注意这里没有用到pool
//建立Connection
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start();
return connection;
}
//建立Session
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
return session;
}
2。发送消息的代码
//建立QueueSession
protected MessageProducer createProducer(Session session) throws JMSException {
Destincation destination = session.createQueue("queue.hello");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if( timeToLive!=0 )
producer.setTimeToLive(timeToLive);
return producer;
}
//使用Producer发送消息到Queue
producer.send(message);
3。接受消息,在JMS规范里面,你可以使用
QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口
public void onMessage(Message message) {
//process message
}
//set MessageListner ,receive message
Destincation destination = session.createQueue("queue.hello");
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
以上就是使用jms queue发送接受消息的基本方式
三 Topic
1. 建立连接
java 代码
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
//如果你要使用DurableSubScription 方式,你必须为connection设置一个ClientID
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start();
return connection;
}
2. 建立Session
java 代码
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
return session;
}
创建Producer 发送消息到Topic
//create topic on session
topic = session.createTopic("topic.hello");
producer = session.createProducer(topic);
//send message
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message);
创建Consumer接受消息(基本上和Queue相同)
Destincation destination = session.createTopic("topic.hello");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
//如果你使用的是Durable Subscription方式,你必须在建立connection的时候
//设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName。
//connection.setClientID(clientId);
//consumer = session.createDurableSubscriber((Topic) destination, consumerName);
四:连接ActiveMQ的方式
ActiveMQConnectionFactory 提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html
常见的有
vm://host:port //vm
tcp://host:port //tcp
ssl://host:port //SSL
stomp://host:port //stomp协议可以跨语言,目前有很多种stomp client 库(java,c#,c/c++,ruby,python...);
activemq例子代码发送Message消息
完整的示例程序:
发送TextMessage
public class SendMessage {
private static final String url = "tcp://localhost:61616";;
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{
Connection connection = null;
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}
*********************************************************************
发送BytesMessage
public class SendMessage {
private String url = "tcp://localhost:61616";
public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content = getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}
private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return buffer;
}
发送完消息后可以访问
http://localhost:8161/admin/queues.jsp
看到相应的queue中是否有消息
适用收取TextMessage消息
public class ReceiveMessage {
private static final String url = "tcp://172.16.168.167:61616";
private static final String QUEUE_NAME = "szf.queue";
public void receiveMessage(){
Connection connection = null;
try{
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}catch(Exception e){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// connection = connectionFactory.createConnection();
}
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session,consumer);
}catch(Exception e){
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message){
try{
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
参考:http://jinguo.javaeye.com/blog/240187
二) 使用Queue或者Topic发送/接受消息
本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。
一.JMS回顾
Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS 1.0.2 b和1.1. 1.1已经成为主流的JMS Provider事实上的标准了.
1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受
Topic消息。
在JMS中间主要定义了2种消息模式Point-to-Point (点对点), Publich/Subscribe Model (发布/订阅者), 其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式。
下面是JMS规范基本的接口和实现
JMS Common Interface PTP-Specific Interface Pub/Sub-specific interfaces
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopiSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver/QueueBrwer TopicSubscriber
二.使用Queue
下面以ActiveMQ example的代码为主进行说明
使用ActiveMQ的Connection,ConnectionFactory 建立连接,注意这里没有用到pool
//建立Connection
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start();
return connection;
}
//建立Session
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
return session;
}
2。发送消息的代码
//建立QueueSession
protected MessageProducer createProducer(Session session) throws JMSException {
Destincation destination = session.createQueue("queue.hello");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
if( timeToLive!=0 )
producer.setTimeToLive(timeToLive);
return producer;
}
//使用Producer发送消息到Queue
producer.send(message);
3。接受消息,在JMS规范里面,你可以使用
QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口
public void onMessage(Message message) {
//process message
}
//set MessageListner ,receive message
Destincation destination = session.createQueue("queue.hello");
consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
以上就是使用jms queue发送接受消息的基本方式
三 Topic
1. 建立连接
java 代码
protected Connection createConnection() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
Connection connection = connectionFactory.createConnection();
//如果你要使用DurableSubScription 方式,你必须为connection设置一个ClientID
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start();
return connection;
}
2. 建立Session
java 代码
protected Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
return session;
}
创建Producer 发送消息到Topic
//create topic on session
topic = session.createTopic("topic.hello");
producer = session.createProducer(topic);
//send message
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message);
创建Consumer接受消息(基本上和Queue相同)
Destincation destination = session.createTopic("topic.hello");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
//如果你使用的是Durable Subscription方式,你必须在建立connection的时候
//设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName。
//connection.setClientID(clientId);
//consumer = session.createDurableSubscriber((Topic) destination, consumerName);
四:连接ActiveMQ的方式
ActiveMQConnectionFactory 提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html
常见的有
vm://host:port //vm
tcp://host:port //tcp
ssl://host:port //SSL
stomp://host:port //stomp协议可以跨语言,目前有很多种stomp client 库(java,c#,c/c++,ruby,python...);
activemq例子代码发送Message消息
完整的示例程序:
发送TextMessage
public class SendMessage {
private static final String url = "tcp://localhost:61616";;
private static final String QUEUE_NAME = "choice.queue";
protected String expectedBody = "<hello>world!</hello>";
public void sendMessage() throws JMSException{
Connection connection = null;
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(expectedBody);
message.setStringProperty("headname", "remoteB");
producer.send(message);
}catch(Exception e){
e.printStackTrace();
}finally{
connection.close();
}
}
*********************************************************************
发送BytesMessage
public class SendMessage {
private String url = "tcp://localhost:61616";
public void sendMessage() throws JMSException{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
BytesMessage message = session.createBytesMessage();
byte[] content = getFileByte("d://test.jar");
message.writeBytes(content);
try{
producer.send(message);
System.out.println("successful send message");
}catch(Exception e){
e.printStackTrace();
e.getMessage();
}finally{
session.close();
connection.close();
}
}
private byte[] getFileByte(String filename){
byte[] buffer = null;
FileInputStream fin = null;
try {
File file = new File(filename);
fin = new FileInputStream(file);
buffer = new byte[fin.available()];
fin.read(buffer);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
fin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return buffer;
}
发送完消息后可以访问
http://localhost:8161/admin/queues.jsp
看到相应的queue中是否有消息
适用收取TextMessage消息
public class ReceiveMessage {
private static final String url = "tcp://172.16.168.167:61616";
private static final String QUEUE_NAME = "szf.queue";
public void receiveMessage(){
Connection connection = null;
try{
try{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}catch(Exception e){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
// connection = connectionFactory.createConnection();
}
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(destination);
consumeMessagesAndClose(connection,session,consumer);
}catch(Exception e){
}
}
protected void consumeMessagesAndClose(Connection connection,
Session session, MessageConsumer consumer) throws JMSException {
for (int i = 0; i < 1;) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
}
public void onMessage(Message message){
try{
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage)message;
String msg = txtMsg.getText();
System.out.println("Received: " + msg);
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ReceiveMessage rm = new ReceiveMessage();
rm.receiveMessage();
}
}
参考:http://jinguo.javaeye.com/blog/240187
相关文章推荐
- ActiveMQ (二) 使用Queue或者Topic发送/接受消息
- ActiveMQ 使用Queue或者Topic发送/接受消息
- MQ系列3 使用Spring发送,消费topic和queue消息 activeMQ
- 如何使用activemq jms发送和接受消息
- ActiveMQ(二)———使用Topic来发送消息
- ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息
- spring-jms(activemq实现)使用queue发送消息简单例子
- ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息
- 消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)
- ActiveMQ(三):使用Topic方式发送消息
- 使用ActiveMQ发送和接受消息(1)
- ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息
- ActiveMQ(二):使用队列Queue方式发送消息
- ActiveMQ5.0实战二:使用Spring发送,消费topic和queue消息
- ActiveMQ5.0实战:使用Spring发送,消费topic和queue消息
- 利用Spring与ActiveMQ整合发送、接收消息实例(Queue与Topic模式)
- mq 使用Spring发送,消费topic和queue消息
- springboot使用activemq同时接收queue和topic消息
- ActiveMQ5.0实战三:使用Spring发送,消费topic和queue消息
- ActiveMQ使用笔记(三)ActiveMQ消息发送与接收