python系列之 RabbitMQ -- Routing
2016-03-21 21:36
671 查看
在前面的章节我们创建了一个简单日志系统,可以广播日志消息到多个接收方
在本 教程中我们添加一个功能 -- 我们将要实现一种可能性即让它只接收已订阅的消息。我们将仅严重错误的日志信息保存到日志文件(以节约磁盘空间),同时仍然能够将所有日志信息打印到控制台上。
绑定(Binding) 是exchange与队列queue之间的关系。这个可以简单的理解为:交换队列(exchange)只将队列(queue)绑定的感兴趣的消息转发到该队列
绑定可以设置一个额外的 routing_key 参数。 为了避免和 basic_publish 参数混淆, 我们将这个称谓 绑定key。以下是我们如何使用key创建一个绑定:
这个意味着一个绑定key 依赖于 exchange类型. 我们前面使用的 fanout exchange时是完全 忽略该值的。
我们使用过没有给我们带来太多灵活性,而只是盲目广播的 fanout 交换(exchange).
我们将要使用一个 direct exchange代替fanout。 这个基于 直接交换(direct exchange)的路由算法很简单 -- 消息发送到绑定键值(binding key) 刚好完全符合路由键值( routing key) 的消息队列中。
为了阐明这个,考虑如下设置:
在这个设置中,我们可以看到直接交换 X 上绑定了两个队列,第一个队列使用绑定键值(binding key)orange进行绑定 , 第二个有两个绑定键值:black和green
这样的设置中一个消息使用路由键值 orange 发布到exchange中将会路由到Q1队列, 使用black 或者 green 的路由键值的消息将发送的Q2队列。所有其它的消息将被丢弃。
这个是一个完美的合法的使用同一个绑定键值(binding key)绑定到多个队列上, 在我们的例子中我们可以在 X 和 Q1之间使用 绑定键 black 添加一个绑定,。 在这种情况下,直接交换( direct exchange ) 将像fanout 一样广播消息到所有匹配的队列中。有路由键值(black)的消息将被分发到 Q1 和 Q2 队列上。
一如往常我们首先需要创建一个交换:
为简单起见,我们假设严重可以是 “info”,‘warning“ 或”error“中的一种
emit_log_direct.py
receive_logs_direct.py
如果你仅想要保存’warning‘ 和 ’error‘日志消息到一个文件,开启一个终端:
如果你想要输出到屏幕查看所有的日志消息,执行:
在本 教程中我们添加一个功能 -- 我们将要实现一种可能性即让它只接收已订阅的消息。我们将仅严重错误的日志信息保存到日志文件(以节约磁盘空间),同时仍然能够将所有日志信息打印到控制台上。
绑定(Bindings)
前面的例子我们已经创建了bindding,我们可以回忆一下代码:channel.queue_bind(exchange=exchange_name, queue=queue_name)
绑定(Binding) 是exchange与队列queue之间的关系。这个可以简单的理解为:交换队列(exchange)只将队列(queue)绑定的感兴趣的消息转发到该队列
绑定可以设置一个额外的 routing_key 参数。 为了避免和 basic_publish 参数混淆, 我们将这个称谓 绑定key。以下是我们如何使用key创建一个绑定:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
这个意味着一个绑定key 依赖于 exchange类型. 我们前面使用的 fanout exchange时是完全 忽略该值的。
直接交换(Direct exchange)
我们前面教程中的日志系统广播所有消息到所有的消费方(consumers)。我们希望扩展一下允许基于日志消息的严重性进行过滤。比如:我们可能希望写日志消息到磁盘的脚本仅仅接收严重错误,而不接收警告(warning)或者信息(info)日志以节约磁盘空间。我们使用过没有给我们带来太多灵活性,而只是盲目广播的 fanout 交换(exchange).
我们将要使用一个 direct exchange代替fanout。 这个基于 直接交换(direct exchange)的路由算法很简单 -- 消息发送到绑定键值(binding key) 刚好完全符合路由键值( routing key) 的消息队列中。
为了阐明这个,考虑如下设置:
在这个设置中,我们可以看到直接交换 X 上绑定了两个队列,第一个队列使用绑定键值(binding key)orange进行绑定 , 第二个有两个绑定键值:black和green
这样的设置中一个消息使用路由键值 orange 发布到exchange中将会路由到Q1队列, 使用black 或者 green 的路由键值的消息将发送的Q2队列。所有其它的消息将被丢弃。
多绑定(Multiple bindings)
这个是一个完美的合法的使用同一个绑定键值(binding key)绑定到多个队列上, 在我们的例子中我们可以在 X 和 Q1之间使用 绑定键 black 添加一个绑定,。 在这种情况下,直接交换( direct exchange ) 将像fanout 一样广播消息到所有匹配的队列中。有路由键值(black)的消息将被分发到 Q1 和 Q2 队列上。
分发日志(Emitting logs)
我们将为我们的日志系统使用这种模式, 代替fanout我们将要发送消息到一个直接交换(direct exchange)中, 我们根据日志的严重等级作为路由键值(routing key). 这种方式,接收脚本将会选择它要接收的严重等级的日志。让我们首先集中发送日志:一如往常我们首先需要创建一个交换:
channel.exchange_declare(exchange='direct_logs', type='direct')然后我们准备发送消息:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
为简单起见,我们假设严重可以是 “info”,‘warning“ 或”error“中的一种
订阅(Subscribing)
接收消息将和前面章节工作一样,只有一个例外 -- 我们要为每个我们感兴趣的严重等级创建一个新的绑定(binding)result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
综上
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
如果你仅想要保存’warning‘ 和 ’error‘日志消息到一个文件,开启一个终端:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你想要输出到屏幕查看所有的日志消息,执行:
$ python receive_logs_direct.py info warning error [*] Waiting for logs. To exit press CTRL+C例如想要发送日志消息:
$ python emit_log_direct.py error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
相关文章推荐
- Python之logging模块
- Win7下 python安装Scrapy
- python学习常用到的模块
- 连连看游戏辅助工具python版的实现
- windows安装python和xgboost
- python的时间处理
- 整理的Python3数据类型
- Spider 之 爬虫 基本工作原理
- python中主线程等待子线程完成的实现(join())
- 初入python
- python 线程池创建
- Python基础--模块
- Python基础--模块
- Python学习笔记-闭包
- 合并排序数组
- 插入排序
- 【python】习题3.21
- 通过pyenv进行多版本python管理
- python中的map、filter、reduce函数
- 冒泡排序与选择排序