您的位置:首页 > 其它

JMS&MQ系列之JMS的请求和回应

2012-09-17 10:17 417 查看
代理类:

/**
*
* @author geloin
* @date 2012-9-14 下午5:57:36
*/
package com.geloin.activemq.test4;

import org.apache.activemq.broker.BrokerService;

/**
*
* @author geloin
* @date 2012-9-14 下午5:57:36
*/
public class Broker {

/**
* 创建并启动代理
*
* @author geloin
* @date 2012-9-14 下午5:58:35
* @throws Exception
*/
private void createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61616");
broker.start();

System.out.println("\nPress any key to stop broker\n");
System.in.read();

broker.start();
}

/**
*
*
* @author geloin
* @date 2012-9-14 下午5:59:30
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Broker broker = new Broker();
broker.createBroker();
}
}


服务端:

/**
*
* @author geloin
* @date 2012-9-14 下午5:37:38
*/
package com.geloin.activemq.test4;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 服务进程
*
* @author geloin
* @date 2012-9-14 下午5:37:38
*/
public class Server implements MessageListener {

private String brokerURL = "tcp://localhost:61616";
private Connection conn;
private Session session;
private String requestQueue = "TEST.QUEUE";
private MessageProducer producer;
private MessageConsumer consumer;

/**
* 消息处理
*
* @author geloin
* @date 2012-9-14 下午5:53:46
* @param messageText
* @return
*/
private String handleRequest(String messageText) {
return "Response to '" + messageText + "'";
}

/*
* (non-Javadoc)
*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
// 监听到有消息传送至此时,执行onMessage
try {
// 若有消息传送到服务时,先创建一个文本消息
TextMessage response = this.session.createTextMessage();
// 若从客户端传送到服务端的消息为文本消息
if (message instanceof TextMessage) {
// 先将传送到服务端的消息转化为文本消息
TextMessage txtMsg = (TextMessage) message;
// 取得文本消息的内容
String messageText = txtMsg.getText();
// 将客户端传送过来的文本消息进行处理后,设置到回应消息里面
response.setText(handleRequest(messageText));
}
// 设置回应消息的关联ID,关联ID来自于客户端传送过来的关联ID
response.setJMSCorrelationID(message.getJMSCorrelationID());
// 生产者发送回应消息,目的由客户端的JMSReplyTo定义,内容即刚刚定义的回应消息
producer.send(message.getJMSReplyTo(), response);

} catch (Exception e) {
e.printStackTrace();
}
}

/**
*
* @author geloin
* @date 2012-9-14 下午5:37:39
* @param args
*/
public static void main(String[] args) throws Exception {
// 定义并启动服务
Server server = new Server();
server.start();

// 监听控制器输入
System.out.println("\nPress any key to stop the server\n");
System.in.read();

// 停止服务
server.stop();
}

/**
* 服务
*
* @author geloin
* @date 2012-9-14 下午5:55:49
* @throws Exception
*/
public void start() throws Exception {
// 使用tcp协议创建连接通道并启动
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
brokerURL);
conn = factory.createConnection();
conn.start();

// 创建session及消息的目的地,并设定交互时使用的存储方式,同时定义队列名称,客户端通过此名称连接
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue(requestQueue);

// 创建生产者,并设定分配模式,生产者的目的地为null,因为它的目的地由JMSReplyTo定义
producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 消费者,及消费者的临听程序
consumer = session.createConsumer(dest);
consumer.setMessageListener(this);
}

/**
* 回收资源
*
* @author geloin
* @date 2012-9-13 上午9:42:06
* @throws Exception
*/
private void stop() {
try {
if (null != producer) {
producer.close();
}

if (null != consumer) {
consumer.close();
}

if (null != session) {
session.close();
}

if (null != conn) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}


客户端:

/**
*
* @author geloin
* @date 2012-9-14 下午6:00:15
*/
package com.geloin.activemq.test4;

import java.util.UUID;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
*
* @author geloin
* @date 2012-9-14 下午6:00:15
*/
public class Client implements MessageListener {

private String brokerURL = "tcp://localhost:61616";
private Connection conn;
private Session session;
private String requestQueue = "TEST.QUEUE";
private MessageProducer producer;
private MessageConsumer consumer;
private Destination tempDest;

/* (non-Javadoc)
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
@Override
public void onMessage(Message message) {
try {
System.out.println("Receive response for: "
+ ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

/**
* 启动
*
* @author geloin
* @date 2012-9-13 下午7:49:00
*/
private void start() {
try {
// 使用tcp协议创建连接通道并启动
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
brokerURL);
conn = factory.createConnection();
conn.start();

// 创建session及消息的目的地,并设定交互时使用的存储方式,同时定义队列名称,此名称与服务端相同
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue(requestQueue);

// 创建生产者,并设定分配模式
producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 使用临时目的地创建消费者,及消费者的临听程序
tempDest = session.createTemporaryQueue();
consumer = session.createConsumer(tempDest);
consumer.setMessageListener(this);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 停止
*
* @author geloin
* @date 2012-9-13 下午7:49:06
*/
public void stop() {
try {
if (null != producer) {
producer.close();
}

if (null != consumer) {
consumer.close();
}

if (null != session) {
session.close();
}

if (null != conn) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 请求
*
* @author geloin
* @date 2012-9-13 下午7:52:54
* @param request
* @throws Exception
*/
public void request(String request) throws Exception {
System.out.println("Requesting:" + request);
TextMessage txtMsg = session.createTextMessage();
txtMsg.setText(request);

txtMsg.setJMSReplyTo(tempDest);

String correlationId = UUID.randomUUID().toString();
txtMsg.setJMSCorrelationID(correlationId);

producer.send(txtMsg);
}

/**
*
* @author geloin
* @date 2012-9-14 下午6:00:15
* @param args
*/
public static void main(String[] args) throws Exception {
Client client = new Client();
client.start();
int i = 0;
while (i++ < 10) {
client.request("REQUEST-" + i);
}
Thread.sleep(3000);
client.stop();
}

}


代码比较简单,是实现JMS的基本过程,从以上代码中,我们可以得出以下结论:

1. 要实现交互,必须有代理,即Broker,该代理必须设定Connector,也即是我们所谓brokerURL。在服务端与客户端交互过程中,broker必须被启动;

2. 要向外发送消息,需要执行以下步骤:

(1) 通过第一步中的brokerURL创建ConnectionFactory;

(2) 通过ConnectionFactory创建Connection并启动;

(3) 通过Connection创建Session;

(4) 通过Session创建发送目标;

(5) 通过Session创建MessageProducer;

(6) 通过Session创建Message;

(7) 通过MessageProducer发送Message。

3. 要接收消息,需要执行以下步骤:

(1) 通过第一步中的brokerURL创建ConnectionFactory;

(2) 通过ConnectionFactory创建Connection并启动;

(3) 通过Connection创建Session;

(4) 通过Session创建发送目标;

(5) 通过Session创建MessageConsumer;
(6) 通过MessageConsumer取得Message并分析处理。

4. 若要实现交互,则注意以下几点:

(1) 服务端和客户端使用同一个brokerURL;

(2) 通常情况下,服务端和客户端各有一个MessageProducer和MessageConsumer;

(3) 为使服务端能够回应多个客户端,通常将其MessageProducer的Destination设置为null,即不设定,而由JMSReplyTo定义;

(4) 对应于服务端的MessageProducer的为null的Destination,若确定服务端与客户端能够交互,则在客户端可设置其MessageConsumer的Destination为临时Destination;

(5) 为使服务端能够正常回应客户端,客户端需设置消息的JMSReplyTo属性及JMSCorrelationID,服务端需设置消息的JMSCorrelationID为客户端设定的JMSCorrelationID,producer的send的Destination为客户端设定的JMSReplyTo。

5. 两个需要交互的MessageProducer和MessageConsumer之间除需要使用同一brokerURL外,还需要保障其Destination的对应,即保持在创建Destination时使用的queueName相同。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: