ActiveMQ实战(四)--ActiveMQ的通信方式之request-response(请求响应模式)
2017-07-23 12:44
549 查看
一、简介
在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。
请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
下图是典型的请求-应答方式的交互过程:
在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo和JMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestor和TopicRequestor能够支持简单的请求/应答过程。
現在,如果我们要实现这么一个过程,在发送请求消息并且等待返回结果的client端的流程如下:
client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination以及correlationID。那么在处理消息的server端得到这个消息后就知道该发送给谁了。
Server端的大致流程如下:
这里我们是用server端注册MessageListener,通过设置返回信息的CorrelationID和JMSReplyTo将信息返回。
以上就是发送和接收消息的双方的大致程序结构。
二、实战request-response(请求响应模式)通信
具体的实现代码如下:
Client:
这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);这一句设置。
另外一个是自己要接收的消息destination, 通过Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,我们需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。
同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:
Server:
这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:
前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要发送消息回去的destination。
另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:
总体来说,整个的交互过程并不复杂,只是比较繁琐。对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。
MessageProtocol
三、运行测试
先运行客户端(请求),在ActiveMQ控制台进行查看,会多出一个记录
"Number Of Pending Messages "等待消费的数量为"1",进入队列的消息为"1",然后运行服务端(响应),会收到信息
再查看控制面板信息会变成如下所示
"Number Of Consumers"消费者为"1","Messages Enqueued"入队数量为"1","Messages Dequeued"出队数量为"1"。
完整示例: http://download.csdn.net/detail/u011781521/9907835
参考:http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。
请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。
下图是典型的请求-应答方式的交互过程:
在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo和JMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestor和TopicRequestor能够支持简单的请求/应答过程。
現在,如果我们要实现这么一个过程,在发送请求消息并且等待返回结果的client端的流程如下:
// client side Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); ... // send a request.. message.setJMSReplyTo(tempDest) message.setJMSCorrelationID(myCorrelationID); producer.send(message);
client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination以及correlationID。那么在处理消息的server端得到这个消息后就知道该发送给谁了。
Server端的大致流程如下:
public void onMessage(Message request) { Message response = session.createMessage(); response.setJMSCorrelationID(request.getJMSCorrelationID()) producer.send(request.getJMSReplyTo(), response) }
这里我们是用server端注册MessageListener,通过设置返回信息的CorrelationID和JMSReplyTo将信息返回。
以上就是发送和接收消息的双方的大致程序结构。
二、实战request-response(请求响应模式)通信
具体的实现代码如下:
Client:
package com.fendo.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Random; /** * 客户端 * @author fendo * */ public class Client implements MessageListener { private static int ackMode; private static String clientQueueName; private boolean transacted = false; private MessageProducer producer; static { clientQueueName = "client.messages"; ackMode = Session.AUTO_ACKNOWLEDGE; } public Client() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(transacted, ackMode); Destination adminQueue = session.createQueue(clientQueueName); //设置消息生成器将消息发送到服务器正在消耗的队列 this.producer = session.createProducer(adminQueue); this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //创建一个临时队列,该客户端将侦听响应, //然后创建一个从该临时队列消耗消息的消费者...对于真正的应用程序, //客户端应该为服务器的每个消息重新使用相同的临时队列...一个临时队列 每个客户端 Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); //此类也将处理到临时队列的消息 responseConsumer.setMessageListener(this); //现在创建您要发送的实际消息 TextMessage txtMessage = session.createTextMessage(); // 设置信息 txtMessage.setText("MyProtocolMessage"); //将回复字段设置为上面创建的临时队列,这是服务器应答的队列... txtMessage.setJMSReplyTo(tempDest); //设置相关ID,以便当您收到响应时,您知道响应是哪个发送消息 //如果没有多个未完成的消息给服务器,那么 //相同的相关ID可以用于所有的消息...如果有多个未完成 //消息到您可能想要将相关ID与此关联的服务器 //消息不知何故...一个地图很好 String correlationId = this.createRandomString(); txtMessage.setJMSCorrelationID(correlationId); this.producer.send(txtMessage); } catch (JMSException e) { //妥善处理异常 } } private String createRandomString() { Random random = new Random(System.currentTimeMillis()); long randomLong = random.nextLong(); return Long.toHexString(randomLong); } public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); System.out.println("响应内容 = " + messageText); } } catch (JMSException e) { //妥善处理异常 } } public static void main(String[] args) { new Client(); } }
这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);这一句设置。
另外一个是自己要接收的消息destination, 通过Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,我们需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。
同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:
public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); System.out.println("messageText = " + messageText); } } catch (JMSException e) { //Handle the exception appropriately } }
Server:
这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:
public void onMessage(Message message) { try { TextMessage response = this.session.createTextMessage(); if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String messageText = txtMsg.getText(); response.setText(this.messageProtocol.handleProtocolMessage(messageText)); } //从接收到的消息中设置相关ID为响应消息的相关ID //这可以让客户端识别该消息的响应 //向服务器发送的一个未完成的消息 response.setJMSCorrelationID(message.getJMSCorrelationID()); //将响应发送到接收消息的JMSReplyTo字段指定的目的地, //这大概是客户创建的临时队列 this.replyProducer.send(message.getJMSReplyTo(), response); } catch (JMSException e) { //妥善处理异常 } }
前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要发送消息回去的destination。
另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:
public Server() { try { //这个消息代理是嵌入的 BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); broker.addConnector(messageBrokerUrl); broker.start(); } catch (Exception e) { //妥善处理异常 } //将消息的处理委托给另一个类,在设置JMS之前实例化它,这样它就可以处理消息了 this.messageProtocol = new MessageProtocol(); this.setupMessageQueueConsumer(); } private void setupMessageQueueConsumer() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); this.session = connection.createSession(this.transacted, ackMode); Destination adminQueue = this.session.createQueue(messageQueueName); //设置一个消息生成器响应来自客户端的消息,我们将从一个消息发送到从jmsreplytoheader字段发送到的目的地 this.replyProducer = this.session.createProducer(null); this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //设置消费者从管理队列中消费消息 MessageConsumer consumer = this.session.createConsumer(adminQueue); consumer.setMessageListener(this); } catch (JMSException e) { //妥善处理异常 } }
总体来说,整个的交互过程并不复杂,只是比较繁琐。对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。
MessageProtocol
package com.fendo.mq; /** * 此类需要运行上面的客户端/服务器示例。 将消息处理委托给单独的类仅仅是个人喜好。 * @author fendo * */ public class MessageProtocol { public String handleProtocolMessage(String messageText) { String responseText; // 判断是否是client传过来的信息,在这里就可以做些解析 if ("MyProtocolMessage".equalsIgnoreCase(messageText)) { responseText = "我收到了信息"; } else { responseText = "我不知道你传的是什么: " + messageText; } return responseText; } }
三、运行测试
先运行客户端(请求),在ActiveMQ控制台进行查看,会多出一个记录
"Number Of Pending Messages "等待消费的数量为"1",进入队列的消息为"1",然后运行服务端(响应),会收到信息
再查看控制面板信息会变成如下所示
"Number Of Consumers"消费者为"1","Messages Enqueued"入队数量为"1","Messages Dequeued"出队数量为"1"。
完整示例: http://download.csdn.net/detail/u011781521/9907835
参考:http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
相关文章推荐
- 稳扎稳打Silverlight(21) - 2.0通信之WebRequest和WebResponse, 对指定的URI发出请求以及接收响应
- 稳扎稳打Silverlight(21) - 2.0通信之WebRequest和WebResponse, 对指定的URI发出请求以及接收响应
- 稳扎稳打Silverlight(21) - 2.0通信之WebRequest和WebResponse, 对指定的URI发出请求以及接收响应
- asp.net 请求request,处理handler,响应response
- servlet3-客户端请求与服务端响应(request,response)
- ActiveMQ实战(二)--ActiveMQ的通信方式之P2P点对点通信(point-to-point)
- 从XMLHttpRequest请求响应里getResponseHeader(header)报错:Refused to get unsafe header "**" 问题解决
- JSP网络编程-请求和响应-HttpServletRequest-HttpServletResponse-学习笔记
- 理解HTTP协议的Request/Response(请求响应)模型
- Servlet中的Request和Response的请求和响应概述
- Web 的请求HttpServletrequest和响应HttpServletResponse
- 网络层架构设计与实战七框架拓展设计之支持原生的HttpUrlConnection方式请求和响应
- 使用java的自定义过滤器Filter 处理请求request 并响应response
- 在线聊天项目1.4版 使用Gson方法解析Json字符串以便重构request和response的各种请求和响应 解决聊天不畅问题 Gson包下载地址
- JavaEE:response响应和request请求
- TC608——Servlet中的Request和response请求响应技术
- 在filter中使用包装类包装request、response分别修改请求参数和修改响应结果
- 在线聊天项目1.4版 使用Gson方法解析Json字符串以便重构request和response的各种请求和响应 解决聊天不畅问题 Gson包下载地址
- asp.net 中请求-响应(request-response)的几种方法
- Scrapy爬虫入门教程十一 Request和Response(请求和响应)