RabbitMQ学习小结(五)—— Topics[Python]
2016-03-02 22:15
691 查看
1. 简介
上节我们提到直连交换机,并基于我们的日志系统使用直连交换机,而本节我们使用rabbitmq内置的另外一种交换机——主题交换机。所谓主题交换机是什么呢?通俗点来讲就是直连交换机的升级版,我们可以认为直连交换机就是她其中的一种形式。
直连交换机需要通过绑定(bindings)和队列建立起关系,而其主要通过的就是routing_key,并且routing_key是要与生产者提供的routing_key完全匹配才能得到消息。主题交换机对其进行了升级,升级的方式就是通过两个符号"#"和"*"以及对routing_key的长度限制和格式限制,完成了模糊匹配,从而达到消费者可以更灵活的匹配要处理的消息,下面再具体针对实例来进行主题交换机的学习。
2. 主题交换机
发送到主题交换机(topic exchange)的消息携带的路由键(routing_key)必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。词语的个数可以随意,但是不要超过255字节。绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:
"*" (星号) 用来表示一个单词。
"#" (井号) 用来表示任意数量(零个或多个)单词。
大家可以根据下图,猜测哪些路由键会被发送到相应的队列,就是类似于正则的匹配,但是比正则简单太多了,有兴趣的可以到官网查看完整内容,我就不再阐述了。
主题交换机
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能
总结
我们基于整个流程进行一下总结生产者
创建连接创建交换机(主题交换机)
发送消息到指定交换机,并将消息中指定routing_key(注意key的规则)
消费者
创建连接创建交换机(主题交换机)
创建匿名队列(每个消费者都要创建的独立的队列,用于接收路由后消息)
创建绑定(通过路由键绑定交换机队列关系,确定队列处理交换机中的哪些消息,注意路由键的正则规则)
在指定交换机处理消息
代码整合
emit_logs_topic.py#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2016-02-28 21:28:17 # @Author : mx (mx472756841@gmail.com) # @Link : http://www.shujutiyu.com/ # @Version : $Id$ import os import pika import sys conn = None routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' try: # 获取连接 conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 获取通道 channel = conn.channel() # 创建直连交换机 channel.exchange_declare(exchange='topic_logs', type='topic') # 在RabbitMQ中发送消息,指定交换机(exchange),指定routing ret = channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message,) print " [x] Sent '{0}'".format(message) print ret except Exception, e: raise e finally: if conn: conn.close()recv_logs_topic.py
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2016-02-29 16:30:21 # @Author : mx (mx472756841@gmail.com) # @Link : http://www.shujutiyu.com/ # @Version : $Id$ import pika import sys conn = None binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],) sys.exit(1) def callback(ch, method, properties, body): """ @ch: channel 通道,是由外部调用时上送 out body 读取队列内容做不同的操作 """ print " [x] method.routing_key {0}".format(method.routing_key) print " [x] Done %r" % (body, ) try: # get connection conn = pika.BlockingConnection(pika.ConnectionParameters( 'localhost') ) # get channel channel = conn.channel() # 声明直连交换机 channel.exchange_declare(exchange='topic_logs', type='topic') # 声明临时队列 , param exclusive 互斥 tmp_queue = channel.queue_declare(exclusive=True) queue_name = tmp_queue.method.queue # 根据输入routing_key绑定交换机与队列 for binding_key in binding_keys: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=binding_key ) # 在队列中读取信息 channel.basic_consume(callback, queue=queue_name, no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming() except Exception, e: raise e finally: if conn: conn.close()
疑问
绑定键为 * 的队列会取到一个路由键为空的消息吗?绑定键为 #.* 的队列会获取到一个名为..的路由键的消息吗?它会取到一个路由键为单个单词的消息吗?
a.*.# 和 a.#的区别在哪儿?
之前提到了队列、消息、交换机可以持久化,有兴趣的不妨试一下
3. 参考资料
官网资料:http://www.rabbitmq.com/tutorials/tutorial-five-python.html中文资料:Topics/]http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/tutorials_with_python/[5]Topics/
相关文章推荐
- RabbitMQ学习小结(六)—— RPC[Python]
- RabbitMQ学习小结(四)—— Routing[Python]
- RabbitMQ学习小结(三)—— Publish Subscribe[Python]
- RabbitMQ学习小结(二)—— Work Queues[Python]
- RabbitMQ学习小结(一)—— Hello World [Python]
- python切片解析
- python中的map,filter,zip函数
- python爬虫 bs4_4select()教程
- python爬虫 beutifulsoup4_1官网介绍
- python 使用不同的版本之间的切换
- Python中lambda表达式
- Python的简单爬虫原理及实现
- [大数据]spark入门 in python(一)HelloWorld
- 利用python requests库模拟登陆知乎
- python struct、json、pickle模块
- [leetcode] Insertion Sort List(python)
- 基于Python Django技术构建web系统实践
- python脚本:计算某个目录下Code行数
- python学习小记0
- Python练习(4):牛顿拉复生算法求解根