学渣讲消息队列之RabbitMQ从敲门到入门(第五讲)—— "Routing"
2018-02-13 08:57
363 查看
学渣讲消息队列之RabbitMQ从敲门到入门(第五讲)—— “Routing”
学渣讲消息队列之RabbitMQ从敲门到入门第五讲 Routing前提条件
本篇教程的重点
绑定
直接交换
多条绑定
发送的日志
订阅消息
合并代码
本文翻译自RabbitMQ官方教程(若链接失效请访问:http://www.rabbitmq.com/tutorials/tutorial-four-python.html)
前提条件
和其它Python教程一样,我们使用Pika RabbitMQ客户端,版本号0.11.0。本篇教程的重点
在前面的教程里我们建立了一个简单的日志系统。我们能够向多位接收者广播日志消息。在这篇教程里,我们要为它增加一项新的功能——我们将只推送消息的一个子集。比如说,我们只将重要的错误消息写入日志文件(以节省磁盘空间),同时仍然能够在控制台终端上打印所有日志消息。
绑定
在前面的例子中我们已经创建了绑定了。你可以回想一下代码:channel.queue_bind(exchange=exchange_name, queue=queue_name)
绑定是交换器和队列间的关系。这可以简单地理解为:队列对交换器中的消息感兴趣。
绑定可以使用额外的
routing_key参数。为了避免与
basic_publish参数混淆,我们将其称为
binding key。下面的例子演示了如何使用一个key创建一个绑定:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
绑定键的含义取决于交换类型。我们前面用过的
fanout交换会简单地忽略这个值。
直接交换
前面教程中我们地日志系统回向所有的消费者广播所有的消息。我们希望将它的功能扩展,以允许根据消息的严重性来进行过滤。比如说,我们可能希望正在将日志消息写入磁盘的脚本只将严重的错误写入,而不写入警告和提示消息,这样不会浪费磁盘空间。我们之前使用的
fanout交换灵活性很差——它只能进行无意识的广播。
我们要代而使用
direct交换。
direct交换背后的路由算法很简单——消息进入
binding key与消息的
routing key完全匹配的队列。
为了说明这一点,请参考以下设置:
在这个设置中,我们可以看到
direct交换
x带有两个队列。第一个队列绑定了关键字
orange,第二个队列有两条绑定,一个绑定了关键字
black,另一个绑定了关键字
green。
在这样一个设置中,一条被推送到交换器中的带有路由键
orange的消息会被路由到队列
Q1。带有路由键
black和
green的消息会被路由到队列
Q2。其他所有的消息则将被丢弃。
多条绑定
使用相同的
binding key绑定多个队列是完全合法的。在我们的例子中,我们可以在
x和
Q1间增加一条使用绑定键
black的绑定。在这种情况下,
direct交换会像
fanout交换一样,向所有匹配的队列广播消息。带有路由键
black的消息既会被传递到
Q1也会被传递到
Q2。
发送的日志
我们将为我们的日志系统使用这种模式。我们不使用fanout交换,而是将消息发送到
dircet交换。我们将消息的严重性作为
routing key。这样接收脚本就能够接收它想要接收的对应严重性的消息了。我们先关注发送的日志。
就像我们总是需要做的那样,我们先创建一个交换:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
现在我们准备好发送一条消息了:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
为了简化,我们假设’severity’是’info’,’warning’,’error’中的一个。
订阅消息
接收消息就像之前教程中的那样,但有一个例外——我们将为每个我们感兴趣的严重性创建新的绑定。result = channel.queue_declare(exclusive=True) queue_name = re 9d1b sult.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
合并代码
emit_log_direct.py脚本的代码:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py脚本的代码:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
如果你想要仅将’warning’和’error’(而不是’info’)的日志消息保存到文件,只需要打开一个控制台终端然后输入:
python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你想在屏幕上看到所有的日志消息,打开一个新的控制台终端然后输入:
python receive_logs_direct.py info warning error # => [*] Waiting for logs. To exit press CTRL+C
然后,比如,要发送一条
error日志消息只需要输入:
python emit_log_direct.py error "Run. Run. Or it will explode." # => [x] Sent 'error':'Run. Run. Or it will explode.'
进入本教程的第五部分(若链接失效请访问:http://www.rabbitmq.com/tutorials/tutorial-five-python.html)去了解如何根据模式来收听消息。
相关文章推荐
- 学渣讲消息队列之RabbitMQ从敲门到入门(第二讲)—— "Hello World!"
- 学渣讲消息队列之RabbitMQ从敲门到入门(第一讲)
- 学渣讲消息队列之RabbitMQ从敲门到入门(第三讲)—— “Work Queues”
- SpringBoot的RabbitMQ消息队列: 五、第四模式"Routing"
- SpringBoot的RabbitMQ消息队列: 五、第四模式"Routing"
- 消息队列RabbitMQ入门介绍
- RabbitMQ消息队列(二):"Hello, World"[转]
- RabbitMQ (消息队列)专题学习05 routing(路由)
- SpringBoot的RabbitMQ消息队列: 四、第三模式"Publish/Subscribe"
- 柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)
- SpringBoot的RabbitMQ消息队列: 四、第三模式"Publish/Subscribe"
- 快速入门分布式消息队列之 RabbitMQ(上)
- RabbitMQ消息队列(五):Routing 消息路由 2[原]
- RabbitMQ消息队列(五):Routing 消息路由
- RabbitMQ消息队列(五):Routing 消息路由
- 柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- RabbitMQ消息队列(五):Routing 消息路由
- SpringBoot的RabbitMQ消息队列: 三、第二模式"Work queues"
- SpringBoot的RabbitMQ消息队列: 六、第五模式"Topics"