您的位置:首页 > 产品设计 > UI/UE

RabbitMQ(part3广播发消息到不同消费者)----Publish/Subscribe

2017-10-26 14:17 309 查看
一、首先前面的第二部分介绍的是好几个工作者依次从名称都是某个名的队列中轮流接收不同的消息。现在下面将介绍所有的工作者接收相同的最近的消息。
代码如下:

Putting it all together


The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logsexchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges. Here goes the code for emit_log.py script:
emit_log.py:
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'));
channel=connection.channel();

channel.exchange_declare(exchange='logs',exchange_type='fanout');#the fanout. Let's create an exchange of that type, and call it logs
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()receive_log.py:
import pika
import time
connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'));
channel=connection.channel();

channel.exchange_declare(exchange='logs',exchange_type='fanout');
result=channel.queue_declare(exclusive=True);#once we disconnect the consumer the queue should be deleted
queue_name=result.method.queue;
channel.queue_bind(exchange='logs',queue=queue_name);
print("[*] Waiting for logs. To exit press CTRL+C");
def callback(ch,method,properties,body):
print("[x] %r "%body);
channel.basic_consume(callback,queue=queue_name,no_ack=True);
channel.start_consuming();

(1)

Bindings


We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.channel.queue_bind(exchange='logs',
queue=result.method.queue)(2)result = channel.queue_declare()

''whenever we connect to Rabbit we need a fresh, empty queue. To do it we could create a queue with a random name, or, even better - let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare
At this point result.method.queue contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.
Secondly, once we disconnect the consumer the queue should be deleted. There's an exclusiveflag for that:
result = channel.queue_declare(exclusive=True)

运行结果为:
cmd1:





cmd2:



cmd3:



二、知识点补充:
2.1、交换机
前面的教程中,我们发送消息到队列,并从中取出消息。现在是时候介绍 RabbitMQ 中完成的消息模型了。
让我们简单的概括一下之前的教程:

发布者(producer):发布消息的应用程序
队列(queue):用于消息存储的缓冲
消费者(consumer):接收消息的应用程序
RabbitMQ 消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。

发布者(producer)只需把消息发送给一个交换机(exchange )。交换机非常简单,它一边从发布者接收消息,一边把消息消息推送到队列。交换机必须知道如何处理它接收的消息,是应该推送到指定的队列还是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。



有几个可供选择的交换机类型:直连交换机(direct),主题交换机(topic),头交换机(headers)和 扇形交换机(fanout)。我们在这里主要说明最后一个,扇形交换机。先创建一个 fanout 类型的交换机,命令为 logs :

channel.exchange_declare(exchange='logs',type='fanout')
扇形交换机(fanout)很简单,你可能从名字上就能猜测出来,它把消息发送给所有的队列,这正是我们日志系统所需要的。2.2、 交换机列表
rabbitmqctl 能够列出服务器上所有的交换机:



这个列表中有一些叫做 amq.* 的交换机。这些都是默认创建的,不过这时候你还不需要它们。

2.3、匿名的交换机
前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。 之前我们是这么发送消息的:

channel.basic_publish(exchange='',routing_key='hello',body=message)exchange 参数就是交换机的名字。空字符串代表默认或者匿名交换机:消息将会根据指定的 routing_key 分发到指定的队列。 现在,我们就可以发送消息到一个具名的交换机了:
channel.basic_publish( exchange = 'logs', routing_key='',body=message)
2.3、临时队列
你还记得我们使用的队列名吗(hello 和 task_queue)?给一个队列命名是很重要的。我们需要把工作者(workers)指向正确的队列。如果你打算在发布者(producers)和消费者(consumers)之前共享队列的话,给队列命名是十分重要的。

但是这并不适合我们的日志系统。我们打算接收所有的日志消息,而不仅仅是一小部分。我们关心的是最近的消息而不是久的。为了解决这个问题,我们需要做两件事情。

第一步,当我们连接上 RabbitMQ 的时候,我们需要一个全新的,空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只需要在调用 queue_declare 方法的时候,不提供 queue 参数就可以了:

result = channel.queue_declare()这时候我们可以通过 result.method.queue 获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。 exclusive 标识符即可达到此目的。
2.4、绑定(Bindings)
此处输入图片的描述



我们已经创建了一个扇形交换机( fanout )和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。

交换机和队列之间的联系我们称之为绑定( binding )。

channel.queue_bind(exchange='logs',queue=result.method.queue)
现在,logs 交换机将会把消息加到我们的队列中。
2.5、 绑定列表
我们可以使用一下命名绑定所有现存的绑定。



2.6、代码整合:



发布日志消息的程序看起来和之前的没有太大区别。最重要的改变就是我们把消息发送给 logs 交换机而不是匿名交换机。在发送的时候我们需要提供 routing_key 参数,但是它的值会被扇形交换机(fanout exchange)忽略。以下是 emit_log.py 脚本:
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print " [x] Sent %r" % (message,)
connection.close()正如你看到的那样,在连接成功以后,我们声明了一个交换机,这一个很重要的,因为不允许发布消息到不存在的交换机。

如果没有绑定队列到交换器,消息将会丢失。但这个没有所谓,如果没有消费者监听,那么消息就会被忽略。

receive_logs.py 的代码:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
                         type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r" % (body,)
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

这样我们就完成了。
打开三个终端,进入代码的目录,分别输入以下三个命令:

sudo service rabbitmq-server start   #确保服务已开启 
python receive_logs.py > logs_from_rabbit.log #把日志保存到 log 文件里
python receive_logs.py    #在屏幕上查看日志
python emit_log.py    # 发送日志

运行 rabbitmqctl list_bindings 你可确认已经创建的队列绑定。2.7、实验总结:
通过绑定交换机,我们实现了将一个消息发送给多个队列。那如何监听消息的子集呢?让我们来继续看下一节教程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RabbitMQ queue exchange
相关文章推荐