架构设计:系统间通信(21)——ActiveMQ的安装与使用
2016-04-08 08:48
519 查看
来源:http://blog.csdn.net/yinwenjie
目录(?)[+]
下载软件
您可以Apache ActiveMQ的官网下载安装包:https://activemq.apache.org/download-archives.html。这里我们示例在CentOS下的安装过程,所以下载Linux下的压缩包即可(http://www.apache.org/dyn/closer.cgi?path=/activemq/5.13.2/apache-activemq-5.13.2-bin.tar.gz)。
解压安装
将下载的安装包放置在root用户的home目录内,解压即可(当然您可以根据自己的需要加压到不同的文件路径下)。如下所示:
以上解压使用的是root用户,这是为了演示方便。正式环境中还是建议禁用root用户,为activeMQ的运行专门创建一个用户和用户组。
配置环境变量(不是必须的)
如果您只是在测试环境使用Apache ActiveMQ,以便熟悉消息中间件本身的特性和使用方式。那么您无需对解压后的软件进行任何配置,所有可运行的命令都在软件安装目录的./bin目录下。为了使用方便,最好配置一下环境变量,如下所示(注意,根据您自己的软件安装位置,环境变量的设置是不一样的,请不要盲目粘贴复制):
在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 针对操作系统进行了更深入的优化,所以您可以看到./bin目录下,有一个针对32位Linux运行命令的./linux-x86-32目录,和针对64位Linux运行命令的./linux-x86-64目录。请按照您自己的情况进行环境变量设置和命令运行。
运行程序
现在您可以在任何目录,运行activemq命令了。注意activemq命令一共有6个参数(console | start | stop | restart | status | dump),启动Apache ActiveMQ使用的命令是activemq start:
如果启动成功,就可以在浏览器上访问服务节点在8161端口的管理页面了(例如http://localhost:8161):
点击‘manage ActiveMQ broker’连接,可以进入管理主界面(默认的用户和密码都是admin)。以上就是Apache ActiveMQ消息中间件最简的安装和运行方式。在后续的文章中,我们会陆续讨论ActiveMQ的集群和高性能优化,那时会介绍对应的ActiveMQ的配置问题。
如同上文讲到的,activemq命令除了start参数用于启动activemq程序以外,还有另外5个参数可以使用:console | stop | restart | status | dump。他们代表的使用意义是:
stop:停止当前ActiveMQ节点的运行。
restart:重新启动当前ActiveMQ节点。
status:查看当前ActiveMQ节点的运行状态。如果当前ActiveMQ节点没有运行,那么将返回“ActiveMQ Broker is not running”的提示信息。注意,status命令只能告诉开发人员当前节点时停止的还是运行的,除此之外不能从status命令获取更多的信息。例如,ActiveMQ为什么创建Queue失败?当前ActiveMQ使用了多少内存?而要获取这些信息,需要使用以下参数启动ActiveMQ节点。
console:使用控制台模式启动ActiveMQ节点;在这种模式下,开发人员可以调试、监控当前ActivieMQ节点的实时情况,并获取实时状态。
dump:如果您采用console模式运行ActiveMQ,那么就可以使用dump参数,在console控制台上获取当前ActiveMQ节点的线程状态快照。
好吧,既然我们已经讨论过如何安装和运行ActiveMQ,也讨论了Stomp协议的组织结构,为什么我们不立即动手试一试操作ActiveMQ承载Stomp协议的消息呢?
下面我们使用ActiveMQ提供的JAVA 客户端(实际上就是ActiveMQ对JMS规范的实现),向ActiveMQ中的Queue(示例代码中将这个Queue命名为’test’)发送一条Stomp协议消息,然后再使用JAVA语言的客户端,从ActiveMQ上接受这条消息:
使用ActiveMQ的API发送Stomp协议消息:
使用ActiveMQ的API接收Stomp协议消息:
以上分别是使用Activie提供的Stomp协议的消息生产端和Stomp协议的消息消费端的代码(如果您不清楚Stomp协议的细节,可以参考我另一篇文章:《架构设计:系统间通信(19)——MQ:消息协议(上)》)。请注意在代码片段中,并没有出现任何一个带有jms名称的包或者类——这是因为ActiveMQ为Stomp协议提供的JAVA
API在内部进行了JMS规范的封装。
您可以查看activemq-stomp中关于协议转换部分的源代码:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父级接口:org.apache.activemq.transport.stomp.FrameTranslator来验证这件事情(关于ActiveMQ对JMS规范的实现设计,如果后续有时间再回头进行讲解)。
以下是Stomp协议的消费者端的运行效果(在生产者端已经向ActiveMQ插入了一条消息之后):
注意,由于消息体中插入了一个时间戳,所以您复制粘贴代码后运行效果并不会和我的演示程序完全一致。
如果您细心的话,在ActiveMQ提供的管理页面上已经看到有两个功能页面:Queue和Topic。Queue和Topic是JMS为开发人员提供的两种不同工作机制的消息队列。 在ActiveMQ官方的解释是:
Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the
time the broker receives the message will get a copy of the message.
中文的可以译做:JMS-Topic 队列基于“订阅-发布”模式,当操作者发布一条消息后,所有对这条消息感兴趣的订阅者都可以收到它——也就是说这条消息会被拷贝成多份,进行分发。只有当前“活动的”订阅者能够收到消息(换句话说,如果当前JMS-Topic队列中没有订阅者,这条消息将被丢弃)。
Queue
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer
receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
中文的可以译做:JMS-Queue是一种“负载均衡模式”的实现。一个消息能且只能被一个消费者接受。如果当前JMS-Queue中没有任何的消费者,那么这条消息将会被Queue存储起来(实际应用中可以存储在磁盘上,也可以存储在数据库中,看软件的配置),直到有一个消费者连接上。另外,如果消费者在接受到消息后,在他断开与JMS-Queue连接之前,没有发送ack信息(可以是客户端手动发送,也可以是自动发送),那么这条消息将被发送给其他消费者。
以下表格摘自互联网上的资料,基本上把Queue和Topic这两种队列的不同特性说清楚了:
首先您使用的MQ消息中间件需要实现了JMS规范;那么通过JMS规范,开发人员可以忽略各种消息协议的细节,只要消息在同一队列中,就能够保证各种消息协议间实现互相转换。下面我们首先来看一个使用JMS API在ActiveMQ中操作openwire协议消息的简单示例,然后再给出一个通过JMS,实现Stomp消息协议和Openwire消息协议间的互转示例。
当以上代码运行到“start”的位置时,我们可以通过观察ActiveMQ管理界面中connection列表中的连接信息,发现消息生产者已经建立了一个Openwire协议的连接:
从而确定我们通过JMS API建立了一个openwire协议的通讯连接。接着我们使用以下代码,建立一个基于openwire协议的“消费者”。注意:消息生产者和消息消费者,映射的队列必须一致。(在示例代码中,它们都映射名称为test的JMS-Queue)
以下代码使用JMS从某个Queue中接收消息:
当以上“消费者”代码运行到start的位置时,我们通过ActiveMQ提供的管理界面可以看到,基于Openwire协议的连接增加到了两条:
注意,您在运行以上测试代码时,不用和我的运行顺序一致。由于Queue模式的队列是要进行消息状态保存的,所以无论您是先运行“消费者”端,还是先运行“生产者”端,最后“消费者”都会收到一条消息。类似如下的效果:
Stomp协议的消息消费者(消息接收者):
当您同时运行Openwire消息发送者和Stomp消息接收者时,您可以在ActiveMQ的管理界面看到这两种协议的连接信息:
以下是Stomp协议消费者接收到的消息内容(经过转换的openwire协议消息):
目录(?)[+]
1、前言
之前我们通过两篇文章(架构设计:系统间通信(19)——MQ:消息协议(上)、架构设计:系统间通信(20)——MQ:消息协议(下))从理论层面上为大家介绍了消息协议的基本定义,并花了较大篇幅向读者介绍了三种典型的消息协议:XMPP协议、Stomp协议和AMQP协议。本小节开始,我们基于之前的知识点讲解这些协议在具体的“消息队列中间件”中是如何被我们操作的。由于本人在实际工作中经常使用ActiveMQ和RabbitMQ,所以就选取这两个“消息队列中间件”进行讲解。如果读者可以补充其他“消息队列中间件”的使用,那当然是再好不过了。2、ActiveMQ的安装和使用
ActiveMQ是Apache软件基金会的开源产品,支持AMQP协议、MQTT协议(和XMPP协议作用类似)、Openwire协议和Stomp协议等多种消息协议。并且ActiveMQ完整支持JMS API接口规范(当然Apache也提供多种其他语言的客户端,例如:C、C++、C#、Ruby、Perl)。2-1、ActiveMQ的安装
在本文发布之时,ActiveMQ最新的版本号是5.13.2(版本号升级很快,不过并不推荐使用最新的版本)。由ActiveMQ的安装是很简单,所以这个过程并不值得我们花很大篇幅进行讨论。具体的过程就是:下载->解压->配置环境变量->运行:下载软件
您可以Apache ActiveMQ的官网下载安装包:https://activemq.apache.org/download-archives.html。这里我们示例在CentOS下的安装过程,所以下载Linux下的压缩包即可(http://www.apache.org/dyn/closer.cgi?path=/activemq/5.13.2/apache-activemq-5.13.2-bin.tar.gz)。
解压安装
将下载的安装包放置在root用户的home目录内,解压即可(当然您可以根据自己的需要加压到不同的文件路径下)。如下所示:
<code class="hljs ruby has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">[root<span class="hljs-variable" style="color: rgb(102, 0, 102); box-sizing: border-box;">@localhost</span> ~]<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;"># tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
以上解压使用的是root用户,这是为了演示方便。正式环境中还是建议禁用root用户,为activeMQ的运行专门创建一个用户和用户组。
配置环境变量(不是必须的)
如果您只是在测试环境使用Apache ActiveMQ,以便熟悉消息中间件本身的特性和使用方式。那么您无需对解压后的软件进行任何配置,所有可运行的命令都在软件安装目录的./bin目录下。为了使用方便,最好配置一下环境变量,如下所示(注意,根据您自己的软件安装位置,环境变量的设置是不一样的,请不要盲目粘贴复制):
<code class="hljs ruby has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">设置该次会话的环境变量: [root<span class="hljs-variable" style="color: rgb(102, 0, 102); box-sizing: border-box;">@localhost</span> ~]<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;"># export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;</span> 永久设置环境变量: [root<span class="hljs-variable" style="color: rgb(102, 0, 102); box-sizing: border-box;">@localhost</span> ~]<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;"># echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li></ul>
在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 针对操作系统进行了更深入的优化,所以您可以看到./bin目录下,有一个针对32位Linux运行命令的./linux-x86-32目录,和针对64位Linux运行命令的./linux-x86-64目录。请按照您自己的情况进行环境变量设置和命令运行。
运行程序
现在您可以在任何目录,运行activemq命令了。注意activemq命令一共有6个参数(console | start | stop | restart | status | dump),启动Apache ActiveMQ使用的命令是activemq start:
<code class="hljs ruby has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">[root<span class="hljs-variable" style="color: rgb(102, 0, 102); box-sizing: border-box;">@localhost</span> ~]<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;"># activemq start</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
如果启动成功,就可以在浏览器上访问服务节点在8161端口的管理页面了(例如http://localhost:8161):
点击‘manage ActiveMQ broker’连接,可以进入管理主界面(默认的用户和密码都是admin)。以上就是Apache ActiveMQ消息中间件最简的安装和运行方式。在后续的文章中,我们会陆续讨论ActiveMQ的集群和高性能优化,那时会介绍对应的ActiveMQ的配置问题。
2-2、ActiveMQ的其他命令参数
如同上文讲到的,activemq命令除了start参数用于启动activemq程序以外,还有另外5个参数可以使用:console | stop | restart | status | dump。他们代表的使用意义是:stop:停止当前ActiveMQ节点的运行。
restart:重新启动当前ActiveMQ节点。
status:查看当前ActiveMQ节点的运行状态。如果当前ActiveMQ节点没有运行,那么将返回“ActiveMQ Broker is not running”的提示信息。注意,status命令只能告诉开发人员当前节点时停止的还是运行的,除此之外不能从status命令获取更多的信息。例如,ActiveMQ为什么创建Queue失败?当前ActiveMQ使用了多少内存?而要获取这些信息,需要使用以下参数启动ActiveMQ节点。
console:使用控制台模式启动ActiveMQ节点;在这种模式下,开发人员可以调试、监控当前ActivieMQ节点的实时情况,并获取实时状态。
dump:如果您采用console模式运行ActiveMQ,那么就可以使用dump参数,在console控制台上获取当前ActiveMQ节点的线程状态快照。
2-3、在ActiveMQ中传递Stomp消息
好吧,既然我们已经讨论过如何安装和运行ActiveMQ,也讨论了Stomp协议的组织结构,为什么我们不立即动手试一试操作ActiveMQ承载Stomp协议的消息呢?下面我们使用ActiveMQ提供的JAVA 客户端(实际上就是ActiveMQ对JMS规范的实现),向ActiveMQ中的Queue(示例代码中将这个Queue命名为’test’)发送一条Stomp协议消息,然后再使用JAVA语言的客户端,从ActiveMQ上接受这条消息:
使用ActiveMQ的API发送Stomp协议消息:
<code class="hljs java has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">package</span> mq.test.stomp; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> java.net.Socket; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> java.util.Date; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> org.apache.activemq.transport.stomp.StompConnection; <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 消息生产者</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">TestProducer</span> {</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span>(String[] args) { <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">try</span> { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 建立Stomp协议的连接</span> StompConnection con = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> StompConnection(); Socket so = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> Socket(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"192.168.61.138"</span>, <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">61613</span>); con.open(so); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 注意,协议版本可以是1.2,也可以是1.1</span> con.setVersion(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"1.2"</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 用户名和密码,这个不必多说了</span> con.connect(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"admin"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"admin"</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 以下发送一条信息(您也可以使用“事务”方式)</span> con.send(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"/test"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"234543"</span> + <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> Date().getTime()); } <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">catch</span>(Exception e) { e.printStackTrace(System.out); } } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li></ul>
使用ActiveMQ的API接收Stomp协议消息:
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">package mq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.test</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.stomp</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import java<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.net</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.Socket</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import java<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.net</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.SocketTimeoutException</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import java<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.util</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.Map</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import org<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.apache</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.activemq</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.transport</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.stomp</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.StompConnection</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import org<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.apache</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.activemq</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.transport</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.stomp</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.StompFrame</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> public class TestConsumer { public static void main(String[] args) throws Exception { // 建立连接 StompConnection con = new StompConnection()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> Socket so = new Socket(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"192.168.61.138"</span>, <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">61613</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.open</span>(so)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.setVersion</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"1.2"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.connect</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"admin"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"admin"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> String ack = <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"client"</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.subscribe</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"/test"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"client"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> // 接受消息(使用循环进行) for(<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;;) {</span> StompFrame frame = null<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> try { // 注意,如果没有接收到消息, // 这个消费者线程会停在这里,直到本次等待超时 frame = con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.receive</span>()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } catch(SocketTimeoutException e) { continue<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } // 打印本次接收到的消息 System<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.out</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.println</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"frame.getAction() = "</span> + frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getAction</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> Map<String, String> headers = frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getHeaders</span>()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> String meesage_id = headers<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.get</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"message-id"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> System<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.out</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.println</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"frame.getBody() = "</span> + frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getBody</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> System<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.out</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.println</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"frame.getCommandId() = "</span> + frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getCommandId</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> // 在ack是client标记的情况下,确认消息 if(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"client"</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.equals</span>(ack)) { con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.ack</span>(meesage_id)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } } } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li></ul>
以上分别是使用Activie提供的Stomp协议的消息生产端和Stomp协议的消息消费端的代码(如果您不清楚Stomp协议的细节,可以参考我另一篇文章:《架构设计:系统间通信(19)——MQ:消息协议(上)》)。请注意在代码片段中,并没有出现任何一个带有jms名称的包或者类——这是因为ActiveMQ为Stomp协议提供的JAVA
API在内部进行了JMS规范的封装。
您可以查看activemq-stomp中关于协议转换部分的源代码:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父级接口:org.apache.activemq.transport.stomp.FrameTranslator来验证这件事情(关于ActiveMQ对JMS规范的实现设计,如果后续有时间再回头进行讲解)。
以下是Stomp协议的消费者端的运行效果(在生产者端已经向ActiveMQ插入了一条消息之后):
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getAction</span>() = MESSAGE frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getBody</span>() = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">2345431458460073204</span> frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getCommandId</span>() = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li></ul>
注意,由于消息体中插入了一个时间戳,所以您复制粘贴代码后运行效果并不会和我的演示程序完全一致。
2-4、ActiveMQ中的Queue和Topics
如果您细心的话,在ActiveMQ提供的管理页面上已经看到有两个功能页面:Queue和Topic。Queue和Topic是JMS为开发人员提供的两种不同工作机制的消息队列。 在ActiveMQ官方的解释是:Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the
time the broker receives the message will get a copy of the message.
中文的可以译做:JMS-Topic 队列基于“订阅-发布”模式,当操作者发布一条消息后,所有对这条消息感兴趣的订阅者都可以收到它——也就是说这条消息会被拷贝成多份,进行分发。只有当前“活动的”订阅者能够收到消息(换句话说,如果当前JMS-Topic队列中没有订阅者,这条消息将被丢弃)。
Queue
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer
receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
中文的可以译做:JMS-Queue是一种“负载均衡模式”的实现。一个消息能且只能被一个消费者接受。如果当前JMS-Queue中没有任何的消费者,那么这条消息将会被Queue存储起来(实际应用中可以存储在磁盘上,也可以存储在数据库中,看软件的配置),直到有一个消费者连接上。另外,如果消费者在接受到消息后,在他断开与JMS-Queue连接之前,没有发送ack信息(可以是客户端手动发送,也可以是自动发送),那么这条消息将被发送给其他消费者。
以下表格摘自互联网上的资料,基本上把Queue和Topic这两种队列的不同特性说清楚了:
比较项目 | Topic 模式队列 | Queue 模式队列 |
---|---|---|
工作模式 | “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息 | “负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息。 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。 |
传递完整性 | 如果没有订阅者,消息会被丢弃 | 消息不会丢弃 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的 |
2-5、JMS和协议间转换
上文已经说到,JMS这套面向消息通信的 JAVA API 是一个和厂商无关的规范。通过JMS,我们能实现不同消息中间件厂商、不同协议间的转换和交互。这一小节我们就来讨论一下这个问题。如果用一张图来表示JMS在消息中间件中的作用话,那么就可以这么来画:首先您使用的MQ消息中间件需要实现了JMS规范;那么通过JMS规范,开发人员可以忽略各种消息协议的细节,只要消息在同一队列中,就能够保证各种消息协议间实现互相转换。下面我们首先来看一个使用JMS API在ActiveMQ中操作openwire协议消息的简单示例,然后再给出一个通过JMS,实现Stomp消息协议和Openwire消息协议间的互转示例。
2-5-1、JMS操作
以下代码使用向某个Queue(命名为test)中发送一条消息:<code class="hljs java has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">package</span> jms; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Connection; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Destination; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.MessageProducer; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Session; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.TextMessage; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> org.apache.activemq.ActiveMQConnectionFactory; <span class="hljs-javadoc" style="color: rgb(136, 0, 0); box-sizing: border-box;">/** * 测试使用JMS API连接ActiveMQ *<span class="hljs-javadoctag" style="color: rgb(102, 0, 102); box-sizing: border-box;"> @author</span> yinwenjie */</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">JMSProducer</span> {</span> <span class="hljs-javadoc" style="color: rgb(136, 0, 0); box-sizing: border-box;">/** * 由于是测试代码,这里忽略了异常处理。 * 正是代码可不能这样做 *<span class="hljs-javadoctag" style="color: rgb(102, 0, 102); box-sizing: border-box;"> @param</span> args *<span class="hljs-javadoctag" style="color: rgb(102, 0, 102); box-sizing: border-box;"> @throws</span> RuntimeException */</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span> (String[] args) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> Exception { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 定义JMS-ActiveMQ连接信息(默认为Openwire协议)</span> ActiveMQConnectionFactory connectionFactory = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> ActiveMQConnectionFactory(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"tcp://192.168.61.138:61616"</span>); Session session = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>; Destination sendQueue; Connection connection = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>; <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//进行连接</span> connection = connectionFactory.createQueueConnection(); connection.start(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//建立会话(设置一个带有事务特性的会话)</span> session = connection.createSession(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>, Session.SESSION_TRANSACTED); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//建立queue(当然如果有了就不会重复建立)</span> sendQueue = session.createQueue(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"/test"</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//建立消息发送者对象</span> MessageProducer sender = session.createProducer(sendQueue); TextMessage outMessage = session.createTextMessage(); outMessage.setText(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"这是发送的消息内容"</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//发送(JMS是支持事务的)</span> sender.send(outMessage); session.commit(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//关闭</span> sender.close(); connection.close(); } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li><li style="box-sizing: border-box; padding: 0px 5px;">46</li><li style="box-sizing: border-box; padding: 0px 5px;">47</li><li style="box-sizing: border-box; padding: 0px 5px;">48</li><li style="box-sizing: border-box; padding: 0px 5px;">49</li></ul>
当以上代码运行到“start”的位置时,我们可以通过观察ActiveMQ管理界面中connection列表中的连接信息,发现消息生产者已经建立了一个Openwire协议的连接:
从而确定我们通过JMS API建立了一个openwire协议的通讯连接。接着我们使用以下代码,建立一个基于openwire协议的“消费者”。注意:消息生产者和消息消费者,映射的队列必须一致。(在示例代码中,它们都映射名称为test的JMS-Queue)
以下代码使用JMS从某个Queue中接收消息:
<code class="hljs java has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">package</span> jms; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Connection; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Destination; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Message; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.MessageConsumer; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.MessageListener; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> javax.jms.Session; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> org.apache.activemq.ActiveMQConnectionFactory; <span class="hljs-javadoc" style="color: rgb(136, 0, 0); box-sizing: border-box;">/** * 测试使用JMS API连接ActiveMQ *<span class="hljs-javadoctag" style="color: rgb(102, 0, 102); box-sizing: border-box;"> @author</span> yinwenjie */</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">JMSConsumer</span> {</span> <span class="hljs-javadoc" style="color: rgb(136, 0, 0); box-sizing: border-box;">/** * 由于是测试代码,这里忽略了异常处理。 * 正是代码可不能这样做 *<span class="hljs-javadoctag" style="color: rgb(102, 0, 102); box-sizing: border-box;"> @param</span> args *<span class="hljs-javadoctag" style="color: rgb(102, 0, 102); box-sizing: border-box;"> @throws</span> RuntimeException */</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span> (String[] args) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> Exception { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 定义JMS-ActiveMQ连接信息</span> ActiveMQConnectionFactory connectionFactory = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> ActiveMQConnectionFactory(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"tcp://192.168.61.138:61616"</span>); Session session = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>; Destination sendQueue; Connection connection = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>; <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//进行连接</span> connection = connectionFactory.createQueueConnection(); connection.start(); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//建立会话(设置为自动ack)</span> session = connection.createSession(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>, Session.AUTO_ACKNOWLEDGE); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//建立Queue(当然如果有了就不会重复建立)</span> sendQueue = session.createQueue(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"/test"</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//建立消息发送者对象</span> MessageConsumer consumer = session.createConsumer(sendQueue); consumer.setMessageListener(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> MessageListener() { <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">onMessage</span>(Message arg0) { <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 接收到消息后,不需要再发送ack了。</span> System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Message = "</span> + arg0); } }); <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">synchronized</span> (JMSConsumer.class) { JMSConsumer.class.wait(); } <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">//关闭</span> consumer.close(); connection.close(); } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li><li style="box-sizing: border-box; padding: 0px 5px;">46</li><li style="box-sizing: border-box; padding: 0px 5px;">47</li><li style="box-sizing: border-box; padding: 0px 5px;">48</li><li style="box-sizing: border-box; padding: 0px 5px;">49</li><li style="box-sizing: border-box; padding: 0px 5px;">50</li><li style="box-sizing: border-box; padding: 0px 5px;">51</li><li style="box-sizing: border-box; padding: 0px 5px;">52</li><li style="box-sizing: border-box; padding: 0px 5px;">53</li><li style="box-sizing: border-box; padding: 0px 5px;">54</li><li style="box-sizing: border-box; padding: 0px 5px;">55</li><li style="box-sizing: border-box; padding: 0px 5px;">56</li><li style="box-sizing: border-box; padding: 0px 5px;">57</li></ul>
当以上“消费者”代码运行到start的位置时,我们通过ActiveMQ提供的管理界面可以看到,基于Openwire协议的连接增加到了两条:
注意,您在运行以上测试代码时,不用和我的运行顺序一致。由于Queue模式的队列是要进行消息状态保存的,所以无论您是先运行“消费者”端,还是先运行“生产者”端,最后“消费者”都会收到一条消息。类似如下的效果:
<code class="hljs ruby has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;"><span class="hljs-constant" style="box-sizing: border-box;">Message</span> = <span class="hljs-constant" style="box-sizing: border-box;">ActiveMQTextMessage</span> {commandId = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">6</span>, responseRequired = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>, messageId = <span class="hljs-constant" style="box-sizing: border-box;">ID</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:yinwenjie-</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">240</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">60482</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1458616972423</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span>, originalDestination = null, originalTransactionId = null, producerId = <span class="hljs-constant" style="box-sizing: border-box;">ID</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:yinwenjie-</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">240</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">60482</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1458616972423</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span>, destination = <span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">queue:</span>/<span class="hljs-regexp" style="color: rgb(0, 136, 0); box-sizing: border-box;">//test</span>, transactionId = <span class="hljs-constant" style="box-sizing: border-box;">TX</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:ID</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:yinwenjie-</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">240</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">60482</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1458616972423</span>-<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span><span class="hljs-symbol" style="color: rgb(0, 102, 102); box-sizing: border-box;">:</span><span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1</span>, expiration = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>, timestamp = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1458617840154</span>, arrival = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>, brokerInTime = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1458617840166</span>, brokerOutTime = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">1458617840187</span>, correlationId = null, replyTo = null, persistent = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>, type = null, priority = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">4</span>, groupID = null, groupSequence = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>, targetConsumerId = null, compressed = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>, userID = null, content = org.apache.activemq.util.<span class="hljs-constant" style="box-sizing: border-box;">ByteSequence</span><span class="hljs-variable" style="color: rgb(102, 0, 102); box-sizing: border-box;">@66968df8</span>, marshalledProperties = null, dataStructure = null, redeliveryCounter = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>, size = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span>, properties = null, readOnlyProperties = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>, readOnlyBody = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>, droppable = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>, jmsXGroupFirstForConsumer = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">false</span>, text = 这是发送的消息内容}</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
2-5-2、协议间转换
下面我们将Openwire协议的消息通过JMS送入Queue队列,并且让基于Stomp协议的消费者接收到这条消息。为了节约篇幅,基于Openwire协议的生产者的代码请参考上一小节2-5-1中“生产者”的代码片段。这里只列出Stomp消息的接受者代码(实际上这段代码在上文中也可以找到):Stomp协议的消息消费者(消息接收者):
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">package mq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.test</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.stomp</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import java<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.net</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.Socket</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import java<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.net</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.SocketTimeoutException</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import java<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.util</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.Map</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import org<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.apache</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.activemq</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.transport</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.stomp</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.StompConnection</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> import org<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.apache</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.activemq</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.transport</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.stomp</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.StompFrame</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> public class TestConsumer { public static void main(String[] args) throws Exception { // 建立连接(注意,Stomp协议的连接端口是<span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">61613</span>) StompConnection con = new StompConnection()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> Socket so = new Socket(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"192.168.61.138"</span>, <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">61613</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.open</span>(so)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.setVersion</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"1.2"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.connect</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"admin"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"admin"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> String ack = <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"client"</span><span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.subscribe</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"/test"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"client"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> // 接受消息(使用循环进行) for(<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;;) {</span> StompFrame frame = null<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> try { // 注意,如果没有接收到消息, // 这个消费者线程会停在这里,直到本次等待超时 frame = con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.receive</span>()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } catch(SocketTimeoutException e) { continue<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } // 打印本次接收到的消息 System<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.out</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.println</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"frame.getAction() = "</span> + frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getAction</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> Map<String, String> headers = frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getHeaders</span>()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> String meesage_id = headers<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.get</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"message-id"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> System<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.out</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.println</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"frame.getBody() = "</span> + frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getBody</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> System<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.out</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.println</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"frame.getCommandId() = "</span> + frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getCommandId</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> // 在ack是client模式的情况下,确认消息 if(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"client"</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.equals</span>(ack)) { con<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.ack</span>(meesage_id)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span> } } } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; background-color: rgb(238, 238, 238); top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right;"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li><li style="box-sizing: border-box; padding: 0px 5px;">31</li><li style="box-sizing: border-box; padding: 0px 5px;">32</li><li style="box-sizing: border-box; padding: 0px 5px;">33</li><li style="box-sizing: border-box; padding: 0px 5px;">34</li><li style="box-sizing: border-box; padding: 0px 5px;">35</li><li style="box-sizing: border-box; padding: 0px 5px;">36</li><li style="box-sizing: border-box; padding: 0px 5px;">37</li><li style="box-sizing: border-box; padding: 0px 5px;">38</li><li style="box-sizing: border-box; padding: 0px 5px;">39</li><li style="box-sizing: border-box; padding: 0px 5px;">40</li><li style="box-sizing: border-box; padding: 0px 5px;">41</li><li style="box-sizing: border-box; padding: 0px 5px;">42</li><li style="box-sizing: border-box; padding: 0px 5px;">43</li><li style="box-sizing: border-box; padding: 0px 5px;">44</li><li style="box-sizing: border-box; padding: 0px 5px;">45</li></ul>
当您同时运行Openwire消息发送者和Stomp消息接收者时,您可以在ActiveMQ的管理界面看到这两种协议的连接信息:
以下是Stomp协议消费者接收到的消息内容(经过转换的openwire协议消息):
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; background-color: transparent; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background-position: initial initial; background-repeat: initial initial;">frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getAction</span>() = MESSAGE frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getBody</span>() = 这是发送的消息内容 frame<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getCommandId</span>() = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span></code>
相关文章推荐
- 架构设计:系统间通信(20)——MQ:消息协议(下)
- 架构设计:系统间通信(19)——MQ:消息协议(上)
- 网站在线留言
- NopCommerce架构分析之(七)主题Theme皮肤管理器
- NopCommerce架构分析之(八)多语言支持
- 分布式系统架构的基本原则和实践概述
- 如何自建一个网站?
- 网站收集ing....
- 网站收集ing....
- 网站消息推送
- 什么是对象存储?OSD架构及原理
- 学习网站
- 解决网站访问过多而导致缓慢的问题
- 架构高性能网站秘笈(三)——浏览器缓存
- 架构高性能网站秘笈(三)——浏览器缓存
- 好的架构不是设计出来的,而是演进出来的
- 第76课:Spark SQL基于网站Log的综合案例实战之Hive数据导入、Spark SQL对数据操作每天晚上20:00YY频道现场授课频道68917580
- 架构高性能网站秘笈(二)——动态内容缓存
- 架构高性能网站秘笈(二)——动态内容缓存
- 做了个工具类的小网站---tool.admaster.club