activemq的两种基本通信方式的使用及总结
2016-06-15 14:29
337 查看
简介
在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择。这里,我们先针对具体的一个消息队列Activemq的基本通信方式进行探讨。activemq是JMS消息通信规范的一个实现。总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式一一讨论一下。基础流程
在讨论具体方式的时候,我们先看看使用activemq需要启动服务的主要过程。按照JMS的规范,我们首先需要获得一个JMSconnectionfactory.,通过这个connectionfactory来创建connection.在这个基础之上我们再创建session,destination,producer和consumer。因此主要的几个步骤如下:
1.获得JMSconnectionfactory.通过我们提供特定环境的连接信息来构造factory。
2.利用factory构造JMSconnection
3.启动connection
4.通过connection创建JMSsession.
5.指定JMSdestination.
6.创建JMSproducer或者创建JMSmessage并提供destination.
7.创建JMSconsumer或注册JMSmessagelistener.
8.发送和接收JMSmessage.
9.关闭所有JMS资源,包括connection,session,producer,consumer等。
publish-subscribe
发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:p2p
p2p的过程则理解起来更加简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路,它的通信场景如下图所示:在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。
这两种通信模式的代码实现有很多相同之处,下面我们用一个例子来简单实现这两种通信方式:
发送者
importjava.io.BufferedReader; importjava.io.IOException; importjava.io.InputStreamReader; importjava.io.StringReader; importjava.util.StringTokenizer; importjavax.jms.Connection; importjavax.jms.ConnectionFactory; importjavax.jms.Destination; importjavax.jms.JMSException; importjavax.jms.MessageProducer; importjavax.jms.Session; importjavax.jms.TextMessage; importorg.apache.activemq.ActiveMQConnectionFactory; publicclassPublisher{ publicstaticfinalStringurl="tcp://localhost:61616";//缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号 ConnectionFactoryfactory; Connectionconnection; Sessionsession; MessageProducerproducer; Destination[]destinations; ComunicateModecomunicateMode=ComunicateMode.pubsub; enumComunicateMode{ p2p,pubsub } publicPublisher(ComunicateModemode)throwsJMSException{ this.comunicateMode=mode; factory=newActiveMQConnectionFactory(url);//这里的url也可以不指定,java代码将默认将端口赋值为61616 connection=factory.createConnection(); try{ connection.start(); }catch(JMSExceptione){ connection.close(); throwe; } session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); producer=session.createProducer(null); } protectedvoidsetDestinations(String[]stocks)throwsJMSException{ destinations=newDestination[stocks.length]; for(inti=0;i<stocks.length;i++){ destinations[i]=comunicateMode==ComunicateMode.pubsub?session .createTopic("Topic."+stocks[i]):session .createQueue("Queue."+stocks[i]); } } protectedvoidsendMessage(Stringmsg)throwsJMSException{ for(Destinationitem:destinations){ TextMessagemsgMessage=session.createTextMessage(msg); producer.send(item,msgMessage); System.out.println(String.format("成功向Topic為【%s】发送消息【%s】", item.toString(),msgMessage.getText())); } } protectedvoidclose()throwsJMSException{ if(connection!=null) connection.close(); } publicstaticvoidmain(String[]args)throwsJMSException, InterruptedException,IOException{ Publisherpublisher=newPublisher(ComunicateMode.p2p);//这里可以修改消息传输方式为pubsub publisher.setDestinations(newString[]{"1","2","3"}); BufferedReaderreader=null; StringcontentString=""; do{ System.out.println("请输入要发送的内容(exit退出):"); reader=newBufferedReader(newInputStreamReader(System.in)); contentString=reader.readLine(); if(contentString.equals("exit")) break; publisher.sendMessage(contentString); }while(!contentString.equals("exit")); reader.close(); publisher.close(); } }
接收者
importjava.io.IOException;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageListener;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclassConsumer{
publicstaticfinalStringurl="tcp://localhost:61616";//缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
ConnectionFactoryfactory;
Connectionconnection;
Sessionsession;
MessageConsumer[]consumers;
ComunicateModecomunicateMode=ComunicateMode.pubsub;
enumComunicateMode{
p2p,pubsub
}
publicConsumer(ComunicateModemode,String[]destinationNames)
throwsJMSException{
this.comunicateMode=mode;
factory=newActiveMQConnectionFactory(url);//这里的url也可以不指定,java代码将默认将端口赋值为61616
connection=factory.createConnection();
connection.start();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
consumers=newMessageConsumer[destinationNames.length];
for(inti=0;i<destinationNames.length;i++){
Destinationdestination=comunicateMode==ComunicateMode.pubsub?session
.createTopic("Topic."+destinationNames[i]):session
.createQueue("Queue."+destinationNames[i]);
consumers[i]=session.createConsumer(destination);
consumers[i].setMessageListener(newMessageListener(){
@Override
publicvoidonMessage(Messagemessage){
try{
System.out.println(String.format("收到消息【%s】",
((TextMessage)message).getText()));
}catch(JMSExceptione){
e.printStackTrace();
}
}
});
}
}
publicvoidclose()throwsJMSException{
if(connection!=null)
connection.close();
}
publicstaticvoidmain(String[]args)throwsJMSException,IOException{
Consumerconsumer=newConsumer(ComunicateMode.p2p,
newString[]{"2"});//这里可以修改消息传输方式为pubsub
System.in.read();
consumer.close();
}
}
request-response
和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。下图是典型的请求-应答方式的交互过程:
在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo和JMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestor和TopicRequestor能够支持简单的请求/应答过程。
現在,如果我们要实现这么一个过程,在发送请求消息并且等待返回结果的client端的流程如下:
Java代码
//clientside
DestinationtempDest=session.createTemporaryQueue();
MessageConsumerresponseConsumer=session.createConsumer(tempDest);
...
//sendarequest..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);
producer.send(message);
client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination以及correlationID。那么在处理消息的server端得到这个消息后就知道该发送给谁了。Server端的大致流程如下:
Java代码
publicvoidonMessage(Messagerequest){
Messageresponse=session.createMessage();
response.setJMSCorrelationID(request.getJMSCorrelationID())
producer.send(request.getJMSReplyTo(),response)
}
这里我们是用server端注册MessageListener,通过设置返回信息的CorrelationID和JMSReplyTo将信息返回。
以上就是发送和接收消息的双方的大致程序结构。具体的实现代码如下:
Client:
Java代码publicClient(){
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
Connectionconnection;
try{
connection=connectionFactory.createConnection();
connection.start();
Sessionsession=connection.createSession(transacted,ackMode);
DestinationadminQueue=session.createQueue(clientQueueName);
//Setupamessageproducertosendmessagetothequeuetheserverisconsumingfrom
this.producer=session.createProducer(adminQueue);
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Createatemporaryqueuethatthisclientwilllistenforresponsesonthencreateaconsumer
//thatconsumesmessagefromthistemporaryqueue...forarealapplicationaclientshouldreuse
//thesametempqueueforeachmessagetotheserver...onetempqueueperclient
DestinationtempDest=session.createTemporaryQueue();
MessageConsumerresponseConsumer=session.createConsumer(tempDest);
//Thisclasswillhandlethemessagestothetempqueueaswell
responseConsumer.setMessageListener(this);
//Nowcreatetheactualmessageyouwanttosend
TextMessagetxtMessage=session.createTextMessage();
txtMessage.setText("MyProtocolMessage");
//Setthereplytofieldtothetempqueueyoucreatedabove,thisisthequeuetheserver
//willrespondto
txtMessage.setJMSReplyTo(tempDest);
//SetacorrelationIDsowhenyougetaresponseyouknowwhichsentmessagetheresponseisfor
//Ifthereisnevermorethanoneoutstandingmessagetotheserverthenthe
//samecorrelationIDcanbeusedforallthemessages...ifthereismorethanoneoutstanding
//messagetotheserveryouwouldpresumablywanttoassociatethecorrelationIDwiththis
//messagesomehow...aMapworksgood
StringcorrelationId=this.createRandomString();
txtMessage.setJMSCorrelationID(correlationId);
this.producer.send(txtMessage);
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}
这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);这一句设置。另外一个是自己要接收的消息destination,通过DestinationtempDest=session.createTemporaryQueue();responseConsumer=session.createConsumer(tempDest);这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,我们需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。
同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:
Java代码
publicvoidonMessage(Messagemessage){
StringmessageText=null;
try{
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
messageText=textMessage.getText();
System.out.println("messageText="+messageText);
}
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}
Server:
这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:Java代码
publicvoidonMessage(Messagemessage){
try{
TextMessageresponse=this.session.createTextMessage();
if(messageinstanceofTextMessage){
TextMessagetxtMsg=(TextMessage)message;
StringmessageText=txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}
//SetthecorrelationIDfromthereceivedmessagetobethecorrelationidoftheresponsemessage
//thisletstheclientidentifywhichmessagethisisaresponsetoifithasmorethan
//oneoutstandingmessagetotheserver
response.setJMSCorrelationID(message.getJMSCorrelationID());
//SendtheresponsetotheDestinationspecifiedbytheJMSReplyTofieldofthereceivedmessage,
//thisispresumablyatemporaryqueuecreatedbytheclient
this.replyProducer.send(message.getJMSReplyTo(),response);
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}
前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要发送消息回去的destination。
另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:
Java代码
publicServer(){
try{
//Thismessagebrokerisembedded
BrokerServicebroker=newBrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
}catch(Exceptione){
//Handletheexceptionappropriately
}
//Delegatingthehandlingofmessagestoanotherclass,instantiateitbeforesettingupJMSsoit
//isreadytohandlemessages
this.messageProtocol=newMessageProtocol();
this.setupMessageQueueConsumer();
}
privatevoidsetupMessageQueueConsumer(){
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(messageBrokerUrl);
Connectionconnection;
try{
connection=connectionFactory.createConnection();
connection.start();
this.session=connection.createSession(this.transacted,ackMode);
DestinationadminQueue=this.session.createQueue(messageQueueName);
//Setupamessageproducertorespondtomessagesfromclients,wewillgetthedestination
//tosendtofromtheJMSReplyToheaderfieldfromaMessage
this.replyProducer=this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//Setupaconsumertoconsumemessagesoffoftheadminqueue
MessageConsumerconsumer=this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}
总体来说,整个的交互过程并不复杂,只是比较繁琐。对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。
一些应用和改进
回顾前面三种基本的通信方式,我们会发现,他们都存在着一定的共同点,比如说都要初始化ConnectionFactory,Connection,Session等。在使用完之后都要将这些资源关闭。如果每一个实现它的通信端都这么写一通的话,其实是一种简单的重复。从工程的角度来看是完全没有必要的。那么,我们有什么办法可以减少这种重复呢?一种简单的方式就是通过工厂方法封装这些对象的创建和销毁,然后简单的通过调用工厂方法的方式得到他们。另外,既然基本的流程都是在开头创建资源在结尾销毁,我们也可以采用TemplateMethod模式的思路。通过继承一个抽象类,在抽象类里提供了资源的封装。所有继承的类只要实现怎么去使用这些资源的方法就可以了。Spring中间的JMSTemplate就提供了这种类似思想的封装。具体的实现可以参考
总结
activemq默认提供了pub-sub,p2p这两种通信的方式。同时也提供了一些对request-response方式的支持。实际上,不仅仅是activemq,对于所有其他实现JMS规范的产品都能够提供类似的功能。这里每种方式都不太复杂,主要是创建和管理资源的步骤显得比较繁琐。相关文章推荐
- 合并多sheet 带sheet名
- nginx: [alert] kill(1668, 1) failed (3: No such process)的解决办法及nginx服务常用命令总结
- 贩妖记 > 第九章,不同维度
- Android eclipse项目导入AS问题汇总。
- 运动状态最佳心率计算器 (Target-Heart-Rate Calculator)
- 贩妖记 > 第七章,走阴规矩
- 一个订阅发布者模式
- 阅读开源引擎源代码的方式学习游戏引擎好吗?
- Google File System阅读笔记
- Mongo服务器集群配置——主从复制
- 待做
- redis+php实现微博功能(三)
- Google Android开发者文档系列-开发企业App
- 抓包工具fiddler的使用
- 【shell】read
- puppet2D 基础教程
- 哇!原来这样的文案,才能在朋友圈流传!
- 有容云:梁胜-如何让Docker容器在企业中投产(上)
- 使用Gridview控件的链接实现页面跳转并且传值
- HttpUtils请求 和XlistView 实现下拉刷新和上拉加载