您的位置:首页 > 编程语言 > Python开发

python系列之 RabbitMQ -- Routing

2016-03-21 21:36 671 查看
在前面的章节我们创建了一个简单日志系统,可以广播日志消息到多个接收方

在本 教程中我们添加一个功能 -- 我们将要实现一种可能性即让它只接收已订阅的消息。我们将仅严重错误的日志信息保存到日志文件(以节约磁盘空间),同时仍然能够将所有日志信息打印到控制台上。

绑定(Bindings)

前面的例子我们已经创建了bindding,我们可以回忆一下代码:

channel.queue_bind(exchange=exchange_name,
queue=queue_name)


绑定(Binding) 是exchange与队列queue之间的关系。这个可以简单的理解为:交换队列(exchange)只将队列(queue)绑定的感兴趣的消息转发到该队列

绑定可以设置一个额外的 routing_key 参数。 为了避免和 basic_publish 参数混淆, 我们将这个称谓 绑定key。以下是我们如何使用key创建一个绑定:

channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')


这个意味着一个绑定key 依赖于 exchange类型. 我们前面使用的 fanout exchange时是完全 忽略该值的。

直接交换(Direct exchange)

我们前面教程中的日志系统广播所有消息到所有的消费方(consumers)。我们希望扩展一下允许基于日志消息的严重性进行过滤。比如:我们可能希望写日志消息到磁盘的脚本仅仅接收严重错误,而不接收警告(warning)或者信息(info)日志以节约磁盘空间。
我们使用过没有给我们带来太多灵活性,而只是盲目广播的 fanout 交换(exchange).

我们将要使用一个 direct exchange代替fanout。 这个基于 直接交换(direct exchange)的路由算法很简单 -- 消息发送到绑定键值(binding key) 刚好完全符合路由键值( routing key) 的消息队列中。
为了阐明这个,考虑如下设置:



在这个设置中,我们可以看到直接交换 X 上绑定了两个队列,第一个队列使用绑定键值(binding key)orange进行绑定 , 第二个有两个绑定键值:black和green
这样的设置中一个消息使用路由键值 orange 发布到exchange中将会路由到Q1队列, 使用black 或者 green 的路由键值的消息将发送的Q2队列。所有其它的消息将被丢弃。

多绑定(Multiple bindings)



这个是一个完美的合法的使用同一个绑定键值(binding key)绑定到多个队列上, 在我们的例子中我们可以在 X 和 Q1之间使用 绑定键 black 添加一个绑定,。 在这种情况下,直接交换( direct exchange ) 将像fanout 一样广播消息到所有匹配的队列中。有路由键值(black)的消息将被分发到 Q1 和 Q2 队列上。

分发日志(Emitting logs)

我们将为我们的日志系统使用这种模式, 代替fanout我们将要发送消息到一个直接交换(direct exchange)中, 我们根据日志的严重等级作为路由键值(routing key). 这种方式,接收脚本将会选择它要接收的严重等级的日志。让我们首先集中发送日志:
一如往常我们首先需要创建一个交换:
channel.exchange_declare(exchange='direct_logs', type='direct')
然后我们准备发送消息:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)


为简单起见,我们假设严重可以是 “info”,‘warning“ 或”error“中的一种

订阅(Subscribing)

接收消息将和前面章节工作一样,只有一个例外 -- 我们要为每个我们感兴趣的严重等级创建一个新的绑定(binding)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)


综上



emit_log_direct.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()


receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)

for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

channel.start_consuming()


如果你仅想要保存’warning‘ 和 ’error‘日志消息到一个文件,开启一个终端:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log


如果你想要输出到屏幕查看所有的日志消息,执行:
$ python receive_logs_direct.py info warning error
[*] Waiting for logs. To exit press CTRL+C
例如想要发送日志消息:
$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: