RabbitMQ入门教程 For Java【3】 - Publish/Subscribe
2016-09-08 14:21
477 查看
RabbitMQ入门教程 For Java【3】 - Publish/Subscribe
我的开发环境: 操作系统: Windows7 64bit
开发环境: JDK 1.7 - 1.7.0_55
开发工具: Eclipse Kepler SR2
RabbitMQ版本: 3.6.0
Elang版本: erl7.2.1
关于Windows7下安装RabbitMQ的教程请先在网上找一下,有空我再补安装教程。
源码地址
https://github.com/chwshuang/rabbitmq.git
在上一章中,我们学习创建了一个消息队列,她的每个任务消息只发送给一个工人。这一章,我们会将同一个任务消息发送给多个工人。这种模式就是“发布/订阅”。
为了说明这种模式,我们将以一个日志系统进行讲解:一个日志发送者,两个日志接收者,接收者1可以把这条日志写入到磁盘上,另外一个接收者2可以将这条日志打印到控制台中。
“发布/订阅”模式的基础是将消息广播到所有的接收器上。
交换器
在之前的教程中,我们都是直接在消息队列中进行发送和接收消息,现在开始要介绍RabbitMQ完整的消息模型了。 首先,我们先来回顾一下之前学到关于RabbitMQ的内容:
生产者是发送消息的应用程序
队列是存储消息的缓冲区
消费者是接收消息的应用程序
实际上,RabbitMQ中消息传递模型的核心思想是:生产者不直接发送消息到队列。实际的运行环境中,生产者是不知道消息会发送到那个队列上,她只会将消息发送到一个交换器,交换器也像一个生产线,她一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。交换器必须知道她所接收的消息是什么?它应该被放到那个队列中?它应该被添加到多个队列吗?还是应该丢弃?这些规则都是按照交换器的规则来确定的。
交换器的规则有:
direct (直连)topic (主题)
headers (标题)
fanout (分发)也有翻译为扇出的。
我们将使用【fanout】类型创建一个名称为 logs的交换器,
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">channel<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.exchangeDeclare</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"logs"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"fanout"</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
分发交换器很简单,你通过名称也能想到,她是广播所有的消息,
交换器列表
通过rabbitmqctl list_exchanges指令列出服务器上所有可用的交换器
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.direct</span> direct amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.fanout</span> fanout amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.headers</span> headers amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.match</span> headers amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.rabbitmq</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.log</span> topic amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.rabbitmq</span><span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.trace</span> topic amq<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.topic</span> topic logs fanout ..<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.done</span>.</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li></ul>
这个列表里面所有以【amq.*】开头的交换器都是RabbitMQ默认创建的。在生产环境中,可以自己定义。
匿名交换器
在之前的教程中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。这是因为RabbitMQ选择了一个空“”字符串的默认交换器。
来看看我们之前的代码:
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">channel<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.basicPublish</span>(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">""</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"hello"</span>, null, message<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getBytes</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。
第二个参数是【routingKey】路由线索
匿名交换器规则:
发送到routingKey名称对应的队列。
现在,我们可以发送消息到交换器中:
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">channel<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.basicPublish</span>( <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"logs"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">""</span>, null, message<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getBytes</span>())<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
临时队列
记得前两章中使用的队列指定的名称吗?(Hello World和task_queue). 如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:
首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
一旦我们断开消费者,队列应该立即被删除。
在Java客户端,提供queuedeclare()为我们创建一个非持久化、独立、自动删除的队列名称。
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">String queueName = channel<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.queueDeclare</span>()<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.getQueue</span>()<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
通过上面的代码就能获取到一个随机队列名称。
例如:它可能是:amq.gen-jzty20brgko-hjmujj0wlg。
绑定
如果我们已经创建了一个分发交换器和队列,现在我们就可以就将我们的队列跟交换器进行绑定。
<code class="hljs avrasm has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">channel<span class="hljs-preprocessor" style="color: rgb(68, 68, 68); box-sizing: border-box;">.queueBind</span>(queueName, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"logs"</span>, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">""</span>)<span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">;</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
执行完这段代码后,日志交换器会将消息添加到我们的队列中。
绑定列表
如果要查看绑定列表,可以执行【rabbitmqctl list_bindings】命令
全部代码
目录
生产者程序,他负责发送日志消息,与之前不同的是它不是将消息发送到匿名交换器中,而是发送到一个名为【logs】的交换器中。我们提供一个空字符串的routingkey,它的功能被交换器的分发类型代替了。下面是EmitLog.java的代码:
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> com.rabbitmq.client.Channel; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> com.rabbitmq.client.Connection; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> com.rabbitmq.client.ConnectionFactory; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">EmitLog</span> {</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> String EXCHANGE_NAME = <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"logs"</span>; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span>(String[] argv) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> Exception { ConnectionFactory factory = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> ConnectionFactory(); factory.setHost(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"localhost"</span>); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"fanout"</span>); <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">// 分发消息</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span>(<span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">int</span> i = <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">0</span> ; i < <span class="hljs-number" style="color: rgb(0, 102, 102); box-sizing: border-box;">5</span>; i++){ String message = <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"Hello World! "</span> + i; channel.basicPublish(EXCHANGE_NAME, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">""</span>, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">null</span>, message.getBytes()); System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" [x] Sent '"</span> + message + <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"'"</span>); } channel.close(); connection.close(); } }</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li></ul>
上面的代码中,在建立连接后,我们声明了一个交互。如果当前没有队列被绑定到交换器,消息将被丢弃,因为没有消费者监听,这条消息将被丢弃。
下面的代码是接收日志ReceiveLogs1.java 和ReceiveLogs2.java:
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> com.rabbitmq.client.*; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> java.io.IOException; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">ReceiveLogs1</span> {</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> String EXCHANGE_NAME = <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"logs"</span>; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span>(String[] argv) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> Exception { ConnectionFactory factory = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> ConnectionFactory(); factory.setHost(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"localhost"</span>); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"fanout"</span>); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">""</span>); System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" [*] Waiting for messages. To exit press CTRL+C"</span>); Consumer consumer = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> DefaultConsumer(channel) { <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">handleDelivery</span>(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[] body) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> IOException { String message = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> String(body, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"UTF-8"</span>); System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" [x] Received '"</span> + message + <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"'"</span>); } }; channel.basicConsume(queueName, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>, consumer); } } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li></ul>
<code class="hljs java has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> com.rabbitmq.client.*; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">import</span> java.io.IOException; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-class" style="box-sizing: border-box;"><span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">class</span> <span class="hljs-title" style="box-sizing: border-box; color: rgb(102, 0, 102);">ReceiveLogs1</span> {</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">private</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">final</span> String EXCHANGE_NAME = <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"logs"</span>; <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">static</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">main</span>(String[] argv) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> Exception { ConnectionFactory factory = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> ConnectionFactory(); factory.setHost(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"localhost"</span>); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"fanout"</span>); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">""</span>); System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" [*] Waiting for messages. To exit press CTRL+C"</span>); Consumer consumer = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> DefaultConsumer(channel) { <span class="hljs-annotation" style="color: rgb(155, 133, 157); box-sizing: border-box;">@Override</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">public</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">void</span> <span class="hljs-title" style="box-sizing: border-box;">handleDelivery</span>(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">byte</span>[] body) <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">throws</span> IOException { String message = <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">new</span> String(body, <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"UTF-8"</span>); System.out.println(<span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">" [x] Received '"</span> + message + <span class="hljs-string" style="color: rgb(0, 136, 0); box-sizing: border-box;">"'"</span>); } }; channel.basicConsume(queueName, <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">true</span>, consumer); } } </code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li><li style="box-sizing: border-box; padding: 0px 5px;">15</li><li style="box-sizing: border-box; padding: 0px 5px;">16</li><li style="box-sizing: border-box; padding: 0px 5px;">17</li><li style="box-sizing: border-box; padding: 0px 5px;">18</li><li style="box-sizing: border-box; padding: 0px 5px;">19</li><li style="box-sizing: border-box; padding: 0px 5px;">20</li><li style="box-sizing: border-box; padding: 0px 5px;">21</li><li style="box-sizing: border-box; padding: 0px 5px;">22</li><li style="box-sizing: border-box; padding: 0px 5px;">23</li><li style="box-sizing: border-box; padding: 0px 5px;">24</li><li style="box-sizing: border-box; padding: 0px 5px;">25</li><li style="box-sizing: border-box; padding: 0px 5px;">26</li><li style="box-sizing: border-box; padding: 0px 5px;">27</li><li style="box-sizing: border-box; padding: 0px 5px;">28</li><li style="box-sizing: border-box; padding: 0px 5px;">29</li><li style="box-sizing: border-box; padding: 0px 5px;">30</li></ul>
运行
先运行ReceiveLogs1和ReceiveLogs2可以看到日志:<code class="hljs vbnet has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;"> [*] Waiting <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> messages. <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">To</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">exit</span> press CTRL+C</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li></ul>
然后运行EmitLog:
<code class="hljs vbnet has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">EmitLog日志: [x] Sent <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 0'</span> [x] Sent <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 1'</span> [x] Sent <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 2'</span> [x] Sent <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 3'</span> [x] Sent <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 4'</span> ReceiveLogs1和ReceiveLogs2日志 [*] Waiting <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">for</span> messages. <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">To</span> <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">exit</span> press CTRL+C [x] Received <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 0'</span> [x] Received <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 1'</span> [x] Received <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 2'</span> [x] Received <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 3'</span> [x] Received <span class="hljs-comment" style="color: rgb(136, 0, 0); box-sizing: border-box;">'Hello World! 4'</span></code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li><li style="box-sizing: border-box; padding: 0px 5px;">6</li><li style="box-sizing: border-box; padding: 0px 5px;">7</li><li style="box-sizing: border-box; padding: 0px 5px;">8</li><li style="box-sizing: border-box; padding: 0px 5px;">9</li><li style="box-sizing: border-box; padding: 0px 5px;">10</li><li style="box-sizing: border-box; padding: 0px 5px;">11</li><li style="box-sizing: border-box; padding: 0px 5px;">12</li><li style="box-sizing: border-box; padding: 0px 5px;">13</li><li style="box-sizing: border-box; padding: 0px 5px;">14</li></ul>
看到这里,说明我们的程序运行正常,消费者通过声明【logs】交换器和【fanout】类型,接收到了来自【logs】交换器的所有消息。
使用【rabbitmqctl list_bindings】命令可以看到两个临时队列的名称
<code class="hljs r has-numbering" style="display: block; padding: 0px; color: inherit; box-sizing: border-box; font-family: 'Source Code Pro', monospace;font-size:undefined; white-space: pre; border-top-left-radius: 0px; border-top-right-radius: 0px; border-bottom-right-radius: 0px; border-bottom-left-radius: 0px; word-wrap: normal; background: transparent;">$ sudo rabbitmqctl list_bindings Listing bindings <span class="hljs-keyword" style="color: rgb(0, 0, 136); box-sizing: border-box;">...</span> logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.</code><ul class="pre-numbering" style="box-sizing: border-box; position: absolute; width: 50px; top: 0px; left: 0px; margin: 0px; padding: 6px 0px 40px; border-right-width: 1px; border-right-style: solid; border-right-color: rgb(221, 221, 221); list-style: none; text-align: right; background-color: rgb(238, 238, 238);"><li style="box-sizing: border-box; padding: 0px 5px;">1</li><li style="box-sizing: border-box; padding: 0px 5px;">2</li><li style="box-sizing: border-box; padding: 0px 5px;">3</li><li style="box-sizing: border-box; padding: 0px 5px;">4</li><li style="box-sizing: border-box; padding: 0px 5px;">5</li></ul>
以上就是这一章讲的发布/订阅模式,下一章将介绍消息路由(Routing)
本教程所有文章:
RabbitMQ入门教程 For Java【1】 - Hello World - 你好世界!
RabbitMQ入门教程 For Java【2】 - Work Queues - 工作队列
RabbitMQ入门教程 For Java【3】 - Publish/Subscribe - 发布/订阅
RabbitMQ入门教程 For Java【4】 - Routing - 消息路由
RabbitMQ入门教程 For Java【5】 - Topic - 模糊匹配
RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC) - 远程调用
相关文章推荐
- RabbitMQ入门教程 For Java【3】 - Publish/Subscribe
- RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)
- RabbitMQ入门教程 For Java【2】 - Work Queues
- RabbitMQ入门教程 For Java【1】 - Hello World
- RabbitMQ Java官方教程(三)----Publish/Subscribe
- RabbitMQ入门教程 For Java【5】 - Topic
- RabbitMQ入门教程 For Java【7】 - Window下的安装与配置
- RabbitMQ入门教程 For Java【5】 - Topic
- RabbitMQ入门教程 For Java【2】 - Work Queues
- RabbitMQ入门教程 For Java【7】 - Window下的安装与配置
- RabbitMQ入门教程 For Java【1】 - Hello World
- RabbitMQ入门教程 For Java【1】 - Hello World
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ入门教程 For Java【8】 - 与Spring集成
- RabbitMQ入门教程 For Java【4】 - Routing
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ入门教程 For Java【4】 -Routing
- RabbitMQ入门教程 For Java【6】 - Remote procedure call (RPC)