您的位置:首页 > 其它

activemq的两种基本通信方式的使用及总结

2016-06-15 14:29 337 查看

简介

在前面一篇文章里讨论过几种应用系统集成的方式,发现实际上面向消息队列的集成方案算是一个总体比较合理的选择。这里,我们先针对具体的一个消息队列Activemq的基本通信方式进行探讨。activemq是JMS消息通信规范的一个实现。总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式一一讨论一下。

基础流程

在讨论具体方式的时候,我们先看看使用activemq需要启动服务的主要过程。

按照JMS的规范,我们首先需要获得一个JMSconnectionfactory.,通过这个connectionfactory来创建connection.在这个基础之上我们再创建session,destination,producer和consumer。因此主要的几个步骤如下:

1.获得JMSconnectionfactory.通过我们提供特定环境的连接信息来构造factory。

2.利用factory构造JMSconnection

3.启动connection

4.通过connection创建JMSsession.

5.指定JMSdestination.

6.创建JMSproducer或者创建JMSmessage并提供destination.

7.创建JMSconsumer或注册JMSmessagelistener.

8.发送和接收JMSmessage.

9.关闭所有JMS资源,包括connection,session,producer,consumer等。

publish-subscribe

发布订阅模式有点类似于我们日常生活中订阅报纸。每年到年尾的时候,邮局就会发一本报纸集合让我们来选择订阅哪一个。在这个表里头列了所有出版发行的报纸,那么对于我们每一个订阅者来说,我们可以选择一份或者多份报纸。比如北京日报、潇湘晨报等。那么这些个我们订阅的报纸,就相当于发布订阅模式里的topic。有很多个人订阅报纸,也有人可能和我订阅了相同的报纸。那么,在这里,相当于我们在同一个topic里注册了。对于一份报纸发行方来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:



p2p

p2p的过程则理解起来更加简单。它好比是两个人打电话,这两个人是独享这一条通信链路的。一方发送消息,另外一方接收,就这么简单。在实际应用中因为有多个用户对使用p2p的链路,它的通信场景如下图所示:



在p2p的场景里,相互通信的双方是通过一个类似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。

这两种通信模式的代码实现有很多相同之处,下面我们用一个例子来简单实现这两种通信方式:

发送者

importjava.io.BufferedReader;
importjava.io.IOException;
importjava.io.InputStreamReader;
importjava.io.StringReader;
importjava.util.StringTokenizer;

importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.JMSException;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

publicclassPublisher{
publicstaticfinalStringurl="tcp://localhost:61616";//缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
ConnectionFactoryfactory;
Connectionconnection;
Sessionsession;
MessageProducerproducer;
Destination[]destinations;
ComunicateModecomunicateMode=ComunicateMode.pubsub;

enumComunicateMode{
p2p,pubsub
}

publicPublisher(ComunicateModemode)throwsJMSException{
this.comunicateMode=mode;
factory=newActiveMQConnectionFactory(url);//这里的url也可以不指定,java代码将默认将端口赋值为61616
connection=factory.createConnection();
try{
connection.start();
}catch(JMSExceptione){
connection.close();
throwe;
}
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
producer=session.createProducer(null);
}

protectedvoidsetDestinations(String[]stocks)throwsJMSException{
destinations=newDestination[stocks.length];
for(inti=0;i<stocks.length;i++){
destinations[i]=comunicateMode==ComunicateMode.pubsub?session
.createTopic("Topic."+stocks[i]):session
.createQueue("Queue."+stocks[i]);
}
}

protectedvoidsendMessage(Stringmsg)throwsJMSException{
for(Destinationitem:destinations){
TextMessagemsgMessage=session.createTextMessage(msg);
producer.send(item,msgMessage);
System.out.println(String.format("成功向Topic為【%s】发送消息【%s】",
item.toString(),msgMessage.getText()));
}
}

protectedvoidclose()throwsJMSException{
if(connection!=null)
connection.close();
}

publicstaticvoidmain(String[]args)throwsJMSException,
InterruptedException,IOException{
Publisherpublisher=newPublisher(ComunicateMode.p2p);//这里可以修改消息传输方式为pubsub
publisher.setDestinations(newString[]{"1","2","3"});
BufferedReaderreader=null;
StringcontentString="";
do{
System.out.println("请输入要发送的内容(exit退出):");
reader=newBufferedReader(newInputStreamReader(System.in));
contentString=reader.readLine();
if(contentString.equals("exit"))
break;
publisher.sendMessage(contentString);
}while(!contentString.equals("exit"));
reader.close();
publisher.close();
}
}




接收者

importjava.io.IOException;
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageConsumer;
importjavax.jms.MessageListener;
importjavax.jms.Session;
importjavax.jms.TextMessage;

importorg.apache.activemq.ActiveMQConnectionFactory;

publicclassConsumer{
publicstaticfinalStringurl="tcp://localhost:61616";//缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口号
ConnectionFactoryfactory;
Connectionconnection;
Sessionsession;
MessageConsumer[]consumers;
ComunicateModecomunicateMode=ComunicateMode.pubsub;

enumComunicateMode{
p2p,pubsub
}

publicConsumer(ComunicateModemode,String[]destinationNames)
throwsJMSException{
this.comunicateMode=mode;
factory=newActiveMQConnectionFactory(url);//这里的url也可以不指定,java代码将默认将端口赋值为61616
connection=factory.createConnection();
connection.start();
session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
consumers=newMessageConsumer[destinationNames.length];
for(inti=0;i<destinationNames.length;i++){
Destinationdestination=comunicateMode==ComunicateMode.pubsub?session
.createTopic("Topic."+destinationNames[i]):session
.createQueue("Queue."+destinationNames[i]);
consumers[i]=session.createConsumer(destination);
consumers[i].setMessageListener(newMessageListener(){
@Override
publicvoidonMessage(Messagemessage){
try{
System.out.println(String.format("收到消息【%s】",
((TextMessage)message).getText()));
}catch(JMSExceptione){
e.printStackTrace();
}
}
});
}
}

publicvoidclose()throwsJMSException{
if(connection!=null)
connection.close();
}

publicstaticvoidmain(String[]args)throwsJMSException,IOException{
Consumerconsumer=newConsumer(ComunicateMode.p2p,
newString[]{"2"});//这里可以修改消息传输方式为pubsub
System.in.read();
consumer.close();
}

}




request-response

和前面两种方式比较起来,request-response的通信方式很常见,但是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另外一方负责处理。而我们实际中的很多应用相当于一种一应一答的过程,需要双方都能给对方发送消息。于是请求-应答的这种通信方式也很重要。它也应用的很普遍。

请求-应答方式并不是JMS规范系统默认提供的一种通信方式,而是通过在现有通信方式的基础上稍微运用一点技巧实现的。下图是典型的请求-应答方式的交互过程:


在JMS里面,如果要实现请求/应答的方式,可以利用JMSReplyTo和JMSCorrelationID消息头来将通信的双方关联起来。另外,QueueRequestor和TopicRequestor能够支持简单的请求/应答过程。

現在,如果我们要实现这么一个过程,在发送请求消息并且等待返回结果的client端的流程如下:

Java代码


//clientside
DestinationtempDest=session.createTemporaryQueue();
MessageConsumerresponseConsumer=session.createConsumer(tempDest);
...

//sendarequest..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);

producer.send(message);


client端创建一个临时队列并在发送的消息里指定了发送返回消息的destination以及correlationID。那么在处理消息的server端得到这个消息后就知道该发送给谁了。Server端的大致流程如下:

Java代码


publicvoidonMessage(Messagerequest){

Messageresponse=session.createMessage();
response.setJMSCorrelationID(request.getJMSCorrelationID())

producer.send(request.getJMSReplyTo(),response)
}


这里我们是用server端注册MessageListener,通过设置返回信息的CorrelationID和JMSReplyTo将信息返回。

以上就是发送和接收消息的双方的大致程序结构。具体的实现代码如下:

Client:

Java代码


publicClient(){
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory("tcp://localhost:61616");
Connectionconnection;
try{
connection=connectionFactory.createConnection();
connection.start();
Sessionsession=connection.createSession(transacted,ackMode);
DestinationadminQueue=session.createQueue(clientQueueName);

//Setupamessageproducertosendmessagetothequeuetheserverisconsumingfrom
this.producer=session.createProducer(adminQueue);
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//Createatemporaryqueuethatthisclientwilllistenforresponsesonthencreateaconsumer
//thatconsumesmessagefromthistemporaryqueue...forarealapplicationaclientshouldreuse
//thesametempqueueforeachmessagetotheserver...onetempqueueperclient
DestinationtempDest=session.createTemporaryQueue();
MessageConsumerresponseConsumer=session.createConsumer(tempDest);

//Thisclasswillhandlethemessagestothetempqueueaswell
responseConsumer.setMessageListener(this);

//Nowcreatetheactualmessageyouwanttosend
TextMessagetxtMessage=session.createTextMessage();
txtMessage.setText("MyProtocolMessage");

//Setthereplytofieldtothetempqueueyoucreatedabove,thisisthequeuetheserver
//willrespondto
txtMessage.setJMSReplyTo(tempDest);

//SetacorrelationIDsowhenyougetaresponseyouknowwhichsentmessagetheresponseisfor
//Ifthereisnevermorethanoneoutstandingmessagetotheserverthenthe
//samecorrelationIDcanbeusedforallthemessages...ifthereismorethanoneoutstanding
//messagetotheserveryouwouldpresumablywanttoassociatethecorrelationIDwiththis
//messagesomehow...aMapworksgood
StringcorrelationId=this.createRandomString();
txtMessage.setJMSCorrelationID(correlationId);
this.producer.send(txtMessage);
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}


这里的代码除了初始化构造函数里的参数还同时设置了两个destination,一个是自己要发送消息出去的destination,在session.createProducer(adminQueue);这一句设置。另外一个是自己要接收的消息destination,通过DestinationtempDest=session.createTemporaryQueue();responseConsumer=session.createConsumer(tempDest);这两句指定了要接收消息的目的地。这里是用的一个临时队列。在前面指定了返回消息的通信队列之后,我们需要通知server端知道发送返回消息给哪个队列。于是txtMessage.setJMSReplyTo(tempDest);指定了这一部分,同时txtMessage.setJMSCorrelationID(correlationId);方法主要是为了保证每次发送回来请求的server端能够知道对应的是哪个请求。这里一个请求和一个应答是相当于对应一个相同的序列号一样。

同时,因为client端在发送消息之后还要接收server端返回的消息,所以它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:

Java代码


publicvoidonMessage(Messagemessage){
StringmessageText=null;
try{
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
messageText=textMessage.getText();
System.out.println("messageText="+messageText);
}
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}


Server:

这里server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:

Java代码


publicvoidonMessage(Messagemessage){
try{
TextMessageresponse=this.session.createTextMessage();
if(messageinstanceofTextMessage){
TextMessagetxtMsg=(TextMessage)message;
StringmessageText=txtMsg.getText();
response.setText(this.messageProtocol.handleProtocolMessage(messageText));
}

//SetthecorrelationIDfromthereceivedmessagetobethecorrelationidoftheresponsemessage
//thisletstheclientidentifywhichmessagethisisaresponsetoifithasmorethan
//oneoutstandingmessagetotheserver
response.setJMSCorrelationID(message.getJMSCorrelationID());

//SendtheresponsetotheDestinationspecifiedbytheJMSReplyTofieldofthereceivedmessage,
//thisispresumablyatemporaryqueuecreatedbytheclient
this.replyProducer.send(message.getJMSReplyTo(),response);
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}


前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要发送消息回去的destination。

另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:

Java代码

publicServer(){
try{
//Thismessagebrokerisembedded
BrokerServicebroker=newBrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector(messageBrokerUrl);
broker.start();
}catch(Exceptione){
//Handletheexceptionappropriately
}

//Delegatingthehandlingofmessagestoanotherclass,instantiateitbeforesettingupJMSsoit
//isreadytohandlemessages
this.messageProtocol=newMessageProtocol();
this.setupMessageQueueConsumer();
}

privatevoidsetupMessageQueueConsumer(){
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(messageBrokerUrl);
Connectionconnection;
try{
connection=connectionFactory.createConnection();
connection.start();
this.session=connection.createSession(this.transacted,ackMode);
DestinationadminQueue=this.session.createQueue(messageQueueName);

//Setupamessageproducertorespondtomessagesfromclients,wewillgetthedestination
//tosendtofromtheJMSReplyToheaderfieldfromaMessage
this.replyProducer=this.session.createProducer(null);
this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//Setupaconsumertoconsumemessagesoffoftheadminqueue
MessageConsumerconsumer=this.session.createConsumer(adminQueue);
consumer.setMessageListener(this);
}catch(JMSExceptione){
//Handletheexceptionappropriately
}
}


总体来说,整个的交互过程并不复杂,只是比较繁琐。对于请求/应答的方式来说,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就好比是一封信里面所带的回执。根据这个信息人家才知道怎么给你回信。对于Server端来说则要额外创建一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。

一些应用和改进

回顾前面三种基本的通信方式,我们会发现,他们都存在着一定的共同点,比如说都要初始化ConnectionFactory,Connection,Session等。在使用完之后都要将这些资源关闭。如果每一个实现它的通信端都这么写一通的话,其实是一种简单的重复。从工程的角度来看是完全没有必要的。那么,我们有什么办法可以减少这种重复呢?

一种简单的方式就是通过工厂方法封装这些对象的创建和销毁,然后简单的通过调用工厂方法的方式得到他们。另外,既然基本的流程都是在开头创建资源在结尾销毁,我们也可以采用TemplateMethod模式的思路。通过继承一个抽象类,在抽象类里提供了资源的封装。所有继承的类只要实现怎么去使用这些资源的方法就可以了。Spring中间的JMSTemplate就提供了这种类似思想的封装。具体的实现可以参考这篇文章。

总结

activemq默认提供了pub-sub,p2p这两种通信的方式。同时也提供了一些对request-response方式的支持。实际上,不仅仅是activemq,对于所有其他实现JMS规范的产品都能够提供类似的功能。这里每种方式都不太复杂,主要是创建和管理资源的步骤显得比较繁琐。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: