OpenStack建立实例完整过程源码详细分析(13)----依据AMQP通信架构实现消息发送机制解析之二
2013-10-02 08:07
1136 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
这篇博文来继续对方法def cast(conf, context, topic, msg)进行解析,首先来看这个方法的源码:
publisher = cls(self.conf, self.channel, topic, **kwargs)和publisher.send(msg, timeout);
1.语句publisher = cls(self.conf, self.channel, topic, **kwargs)的解析
从前面的代码中可以知道,cls传进来的是主题式消息发布者类TopicPublisher,所以这条语句实现的功能就是对类TopicPublisher进行初始化,并获取其实例化对象。实际上,在类TopicPublisher的初始化过程中,实现了很多功能,我们来看类TopicPublisher初始化的具体的实现代码:
可以看见,类TopicPublisher的初始化过程中,获取了交换器的名称,并且对其父类Publisher进行了进一步的初始化操作。
这里也已经实现了前面所提到的步骤中获得channel和创建交换器两个步骤。
我们再回到方法reconnect中,解析语句self.producer = kombu.messaging.Producer(exchange=self.exchange, channel=channel, routing_key=self.routing_key),根据指定exchange与routing_key来创建Producer的功能,也就是前面所提到的步骤中的第(4)步。具体我们来看类Producer的初始化方法的代码实现:
至此,我们已经实现了前面所提到的步骤中的第(4)步,即根据指定exchange与routing_key来创建Producer。
2.语句publisher.send(msg, timeout)的解析
我们再回到方法publisher_send中,对语句publisher.send(msg, timeout)进行解析。这条语句实现了将指定消息发送到指定的队列中。
先来看方法send的代码实现:
我们直接来看类exchange下的方法Message的具体实现代码:
mandatory=False, immediate=False, exchange=None)的具体代码实现:
我们先来看方法_lookup的代码实现:
至此,实现了把消息插入到合适的队列之中,最终实现了基于AMQP框架下的消息的发送端操作。当然,这里还有很多细节问题,我会在代码分析完成之后进行一些总结。
之前有个朋友提到了,Nova之中是怎样通过AMQP协议进行消息接受并执行的,我将会在下一篇博文中进行解析。
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
这篇博文来继续对方法def cast(conf, context, topic, msg)进行解析,首先来看这个方法的源码:
def cast(conf, context, topic, msg, connection_pool): """ 发送一个topic主题的消息,不用等待任何信息的返回; """ LOG.debug(_('Making asynchronous cast on %s...'), topic) # 为消息msg添加随机生成的unique_id; _add_unique_id(msg) # 打包上下文信息context到msg中; pack_context(msg, context) # ConnectionContext(conf, connection_pool):建立一个新连接,或者从连接池中获取一个连接; # serialize_msg(msg):消息msg的序列化; # topic_send:实现发送一个'topic'主题的消息; with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg))前一篇博文已经对建立连接的实现进行了分析,下面来看语句topic_send(topic, rpc_common.serialize_msg(msg)),方法topic_send实现了通过主题发布者类,通过主题式的交换器,发送一个消息到指定队列。来看方法topic_send的具体代码:
def topic_send(self, topic, msg, timeout=None): """ 通过主题发布者类,通过主题式的交换器,发送一个消息到指定队列; # msg:序列化后的msg消息; # topic:主题信息; # timeout:=None 定义了等待程序响应时间期限; """ self.publisher_send(TopicPublisher, topic, msg, timeout)
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " "'%(topic)s': %(err_str)s") % log_info) def _publish(): # 调用发布者类(cls传进来的是TopicPublisher),初始化这个类,获取这个类的对象; publisher = cls(self.conf, self.channel, topic, **kwargs) # 发送一个信息,即给定的msg; publisher.send(msg, timeout) self.ensure(_error_callback, _publish)可以知道,在方法publisher_send中,最重要的两条语句就是:
publisher = cls(self.conf, self.channel, topic, **kwargs)和publisher.send(msg, timeout);
1.语句publisher = cls(self.conf, self.channel, topic, **kwargs)的解析
从前面的代码中可以知道,cls传进来的是主题式消息发布者类TopicPublisher,所以这条语句实现的功能就是对类TopicPublisher进行初始化,并获取其实例化对象。实际上,在类TopicPublisher的初始化过程中,实现了很多功能,我们来看类TopicPublisher初始化的具体的实现代码:
class TopicPublisher(Publisher): """ 主题式生产者类; """ def __init__(self, conf, channel, topic, **kwargs): """ 初始化一个topic发布者类; """ # rabbit_durable_queues:这个参数定义了在RabbitMQ中是否使用持久性的队列; # 参数的默认值为False; options = {'durable': conf.rabbit_durable_queues, 'auto_delete': False, 'exclusive': False} options.update(kwargs) # 获取交换器的名称,默认值为“openstack”; # get_control_exchange:获取配置参数control_exchange; exchange_name = rpc_amqp.get_control_exchange(conf) super(TopicPublisher, self).__init__(channel, exchange_name, topic, type='topic', **options)
可以看见,类TopicPublisher的初始化过程中,获取了交换器的名称,并且对其父类Publisher进行了进一步的初始化操作。
class Publisher(object): """ 消息生产者基类; """ def __init__(self, channel, exchange_name, routing_key, **kwargs): """ 初始化类Publisher; """ # 交换器名称,默认值为“openstack”; self.exchange_name = exchange_name self.routing_key = routing_key self.kwargs = kwargs # 重新建立连接后,重新建立生产者; self.reconnect(channel)来看方法reconnect,在这个方法中,实现了交换器的声明,以及队列的绑定等重要的功能。
def reconnect(self, channel): """ 重新建立连接后,重新建立生产者; """ # 交换器的声明和绑定; self.exchange = kombu.entity.Exchange(name=self.exchange_name, **self.kwargs) self.producer = kombu.messaging.Producer(exchange=self.exchange, channel=channel, routing_key=self.routing_key)在方法reconnect中,先来看语句self.exchange = kombu.entity.Exchange(name=self.exchange_name, **self.kwargs),这条语句实现了根据指定的参数进行交换器的声明。具体实现来看类Exchange的初始化方法:
class Exchange(MaybeChannelBound): """ 交换器声明; """ # 短暂性消息传递方式; TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE # 持久性消息传递方式; PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE # 交换器名称; name = "" # 如果交换器类型没有指定,则默认为直接型交换器; type = "direct" # 默认交换器是持久类型的; durable = True auto_delete = False # 持久性消息传递方式; delivery_mode = PERSISTENT_DELIVERY_MODE attrs = (("name", None), ("type", None), ("arguments", None), ("durable", bool), ("auto_delete", bool), ("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m)) def __init__(self, name="", type="", channel=None, **kwargs): super(Exchange, self).__init__(**kwargs) self.name = name or self.name self.type = type or self.type # maybe_bind:如果没有绑定,则绑定实例到channel; self.maybe_bind(channel)
def maybe_bind(self, channel): """ 如果没有绑定,则绑定实例到channel; """ if not self.is_bound and channel: self._channel = channel self.when_bound() self._is_bound = True return self在类Exchange的初始化方法我们可以知道,这个类的初始化方法实现了声明一个指定名称指定类型的交换器,在这段代码中,交换器的名称为“openstack”,交换器的类型为主题式交换器。
这里也已经实现了前面所提到的步骤中获得channel和创建交换器两个步骤。
我们再回到方法reconnect中,解析语句self.producer = kombu.messaging.Producer(exchange=self.exchange, channel=channel, routing_key=self.routing_key),根据指定exchange与routing_key来创建Producer的功能,也就是前面所提到的步骤中的第(4)步。具体我们来看类Producer的初始化方法的代码实现:
class Producer(object): """ 消息生产者; """ # 所使用的channel; channel = None # 默认的交换器; exchange = None # 默认的路由key; routing_key = "" # 所使用的默认的序列化格式;默认为JSON; serializer = None # 默认的压缩方法,默认为禁用; compression = None auto_declare = True on_return = None def __init__(self, channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None): from kombu.connection import BrokerConnection if isinstance(channel, BrokerConnection): channel = channel.default_channel self.channel = channel self.connection = self.channel.connection.client self.exchange = exchange or self.exchange if self.exchange is None: self.exchange = Exchange("") self.routing_key = routing_key or self.routing_key self.serializer = serializer or self.serializer self.compression = compression or self.compression self.on_return = on_return or self.on_return if auto_declare is not None: self.auto_declare = auto_declare self.exchange = self.exchange(self.channel) if self.auto_declare: self.declare() if self.on_return: self.channel.events["basic_return"].append(self.on_return)我们可以看到,其实类Producer的初始化方法中主要就是进行了一些变量的初始化赋值操作,重要的就是交换器exchange和routing_key的确定。其中,如果系统定义了要自动声明交换器,则要调用方法declare()来实现对交换器的验证操作,具体就不进行展开了。
至此,我们已经实现了前面所提到的步骤中的第(4)步,即根据指定exchange与routing_key来创建Producer。
2.语句publisher.send(msg, timeout)的解析
我们再回到方法publisher_send中,对语句publisher.send(msg, timeout)进行解析。这条语句实现了将指定消息发送到指定的队列中。
先来看方法send的代码实现:
def send(self, msg, timeout=None): """ 发送一个信息,即给定的msg; # msg:序列化后的msg消息; # timeout=None 定义了等待程序响应时间期限; """ # 如果定义了timeout值,则在publish头部定义ttl的值; # 发送信息; if timeout: # AMQP TTL以毫秒为单位,它定义在publish头部分; #注:TTL是Time-To-Live的缩写,为消息生存时间的意思,具体值由timeout来确定; self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) else: self.producer.publish(msg)
def publish(self, body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, **properties): """ 发布消息到指定的队列(与相应的交换器进行绑定); """ headers = headers or {} if routing_key is None: routing_key = self.routing_key if compression is None: compression = self.compression if isinstance(exchange, Exchange): exchange = exchange.name # 准备用于存储要发送消息的数据结构; body, content_type, content_encoding = self._prepare( body, serializer, content_type, content_encoding, compression, headers) # 建立消息实例用于发送信息; message = self.exchange.Message(body, delivery_mode, priority, content_type, content_encoding, headers=headers, properties=properties) # 实现消息的发送; return self.exchange.publish(message, routing_key, mandatory, immediate, exchange=exchange)我们可以看到在方法publish中主要实现了准备用于发送消息的数据结构,把要发送的消息存储到相应的数据结构中,最后实现消息的发送。
我们直接来看类exchange下的方法Message的具体实现代码:
def Message(self, body, delivery_mode=None, priority=None, content_type=None, content_encoding=None, properties=None, headers=None): """ 建立消息实例用于发送信息; """ properties = properties or {} delivery_mode = delivery_mode or self.delivery_mode properties["delivery_mode"] = DELIVERY_MODES.get(delivery_mode, delivery_mode) return self.channel.prepare_message(body, properties=properties, priority=priority, content_type=content_type, content_encoding=content_encoding, headers=headers)
def prepare_message(self, message_data, priority=None, content_type=None, content_encoding=None, headers=None, properties=None): """Prepare message data.""" properties = properties or {} info = properties.setdefault("delivery_info", {}) info["priority"] = priority or 0 return {"body": message_data, "content-encoding": content_encoding, "content-type": content_type, "headers": headers or {}, "properties": properties or {}}我们再来看方法publish中的语句self.exchange.publish(message, routing_key, mandatory, immediate, exchange=exchange),看看如何实现消息的发送操作。具体来看方法def publish(self, message, routing_key=None,
mandatory=False, immediate=False, exchange=None)的具体代码实现:
def publish(self, message, routing_key=None, mandatory=False, immediate=False, exchange=None): """ 发布消息; """ exchange = exchange or self.name return self.channel.basic_publish(message, exchange=exchange, routing_key=routing_key, mandatory=mandatory, immediate=immediate)
def basic_publish(self, message, exchange, routing_key, **kwargs): """ 发布消息; """ props = message["properties"] message["body"], props["body_encoding"] = \ self.encode_body(message["body"], self.body_encoding) props["delivery_info"]["exchange"] = exchange props["delivery_info"]["routing_key"] = routing_key props["delivery_tag"] = self._next_delivery_tag() self.typeof(exchange).deliver(message, exchange, routing_key, **kwargs)这里方法basic_publish实现了发送消息到指定的交换器和routing_key所决定的队列中去,具体是由方法deliver所实现的,由于交换器的不同,三种交换器共实现了三个deliver方法,分别处于类FanoutExchange、TopicExchange和DirectExchange之下。由于这里实现的交换器是主题式交换器,所以这里执行的是类TopicExchange之下的deliver方法,具体来看代码:
def deliver(self, message, exchange, routing_key, **kwargs): _lookup = self.channel._lookup _put = self.channel._put deadletter = self.channel.deadletter_queue for queue in [q for q in _lookup(exchange, routing_key) if q and q != deadletter]: _put(queue, message, **kwargs)这个方法调用了方法_lookup实现了寻找到所有满足`routing_key`与给定`exchange`相匹配的队列;再调用方法_put实现了把要发送的消息插入到相应合适的队列之中。
我们先来看方法_lookup的代码实现:
def _lookup(self, exchange, routing_key, default=None): """ 寻找到所有满足`routing_key`与给定`exchange`相匹配的队列; """ if default is None: default = self.deadletter_queue try: return self.typeof(exchange).lookup(self.get_table(exchange), exchange, routing_key, default) except KeyError: self._new_queue(default) return [default]可以看到这里又根据几种不同的交换器,调用相应的lookup方法具体实现满足条件队列的搜索。而且,如果没有找到合适的队列,将会抛出异常,并调用方法新建队列用于存储消息。之所以不同的交换器有不同的lookup方法,是因为不同类型的交换器,有不同的匹配算法(这个在之前的一篇博文中已经说过)。因为这里是主题式交换器,所以调用相应的lookup方法,具体来看代码:
def lookup(self, table, exchange, routing_key, default): return [queue for rkey, pattern, queue in table if self._match(pattern, routing_key)] or [default]搜索到所有的合适的队列之后,就要实现把消息插入队列,这里我们回到方法def deliver(self, message, exchange, routing_key, **kwargs),对语句_put(queue, message, **kwargs)进行解析。我们来看方法_put的代码实现:
def _put(self, queue, message, **kwargs): self._queue_for(queue).put(message)
def put(self, item, block=True, timeout=None): """ Put an item into the queue. """ self.not_full.acquire() try: if self.maxsize > 0: if not block: if self._qsize() == self.maxsize: raise Full elif timeout is None: while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a positive number") else: endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: raise Full self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release()
def _put(self, item): self.queue.append(item)
至此,实现了把消息插入到合适的队列之中,最终实现了基于AMQP框架下的消息的发送端操作。当然,这里还有很多细节问题,我会在代码分析完成之后进行一些总结。
之前有个朋友提到了,Nova之中是怎样通过AMQP协议进行消息接受并执行的,我将会在下一篇博文中进行解析。
相关文章推荐
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二
- OpenStack建立实例完整过程源码详细分析(14)----依据AMQP通信架构实现消息接收机制解析之一
- OpenStack建立实例完整过程源码详细分析(5)
- OpenStack建立实例完整过程源码详细分析(8)
- OpenStack建立实例完整过程源码详细分析(6)
- OpenStack建立实例完整过程源码详细分析(11)
- OpenStack建立实例完整过程源码详细分析(4)
- OpenStack建立实例完整过程源码详细分析(2)
- OpenStack建立实例完整过程源码详细分析(3)
- OpenStack建立实例完整过程源码详细分析(10)
- OpenStack建立实例完整过程源码详细分析(7)
- OpenStack建立实例完整过程源码详细分析(9)
- OpenStack建立实例完整过程源码详细分析(1)
- 消息传递机制的具体实现过程(分析源码之后的总结)
- Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
- ceph源码分析之消息通信机制
- [Android实例] android的消息处理机制(图+源码分析)——Looper,Handler,Message
- Android消息处理机制源码分析(二):本地实现
- Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析