您的位置:首页 > 运维架构 > 网站架构

OpenStack建立实例完整过程源码详细分析(13)----依据AMQP通信架构实现消息发送机制解析之二

2013-10-02 08:07 1136 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。

博客地址:http://blog.csdn.net/gaoxingnengjisuan

邮箱地址:dong.liu@siat.ac.cn

这篇博文来继续对方法def cast(conf, context, topic, msg)进行解析,首先来看这个方法的源码:

def cast(conf, context, topic, msg, connection_pool):
    """
    发送一个topic主题的消息,不用等待任何信息的返回;
    """
    LOG.debug(_('Making asynchronous cast on %s...'), topic)
    
    # 为消息msg添加随机生成的unique_id;
    _add_unique_id(msg)
    # 打包上下文信息context到msg中;
    pack_context(msg, context)
    
    # ConnectionContext(conf, connection_pool):建立一个新连接,或者从连接池中获取一个连接;
    # serialize_msg(msg):消息msg的序列化;
    # topic_send:实现发送一个'topic'主题的消息;
    with ConnectionContext(conf, connection_pool) as conn:
        conn.topic_send(topic, rpc_common.serialize_msg(msg))
前一篇博文已经对建立连接的实现进行了分析,下面来看语句topic_send(topic, rpc_common.serialize_msg(msg)),方法topic_send实现了通过主题发布者类,通过主题式的交换器,发送一个消息到指定队列。来看方法topic_send的具体代码:

def topic_send(self, topic, msg, timeout=None):
        """
        通过主题发布者类,通过主题式的交换器,发送一个消息到指定队列;
                     
        # msg:序列化后的msg消息;
        # topic:主题信息;
        # timeout:=None 定义了等待程序响应时间期限;
        """
        self.publisher_send(TopicPublisher, topic, msg, timeout)
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):

    def _error_callback(exc):
        log_info = {'topic': topic, 'err_str': str(exc)}
        LOG.exception(_("Failed to publish message to topic "
                      "'%(topic)s': %(err_str)s") % log_info)

    def _publish():
        # 调用发布者类(cls传进来的是TopicPublisher),初始化这个类,获取这个类的对象;
        publisher = cls(self.conf, self.channel, topic, **kwargs)
        # 发送一个信息,即给定的msg;
        publisher.send(msg, timeout)

    self.ensure(_error_callback, _publish)
可以知道,在方法publisher_send中,最重要的两条语句就是:

publisher = cls(self.conf, self.channel, topic, **kwargs)和publisher.send(msg, timeout);

1.语句publisher = cls(self.conf, self.channel, topic, **kwargs)的解析

从前面的代码中可以知道,cls传进来的是主题式消息发布者类TopicPublisher,所以这条语句实现的功能就是对类TopicPublisher进行初始化,并获取其实例化对象。实际上,在类TopicPublisher的初始化过程中,实现了很多功能,我们来看类TopicPublisher初始化的具体的实现代码:

class TopicPublisher(Publisher):
    """
    主题式生产者类;
    """
    def __init__(self, conf, channel, topic, **kwargs):
        """
        初始化一个topic发布者类;
        """
        
        # rabbit_durable_queues:这个参数定义了在RabbitMQ中是否使用持久性的队列;
        # 参数的默认值为False;
        options = {'durable': conf.rabbit_durable_queues,
                   'auto_delete': False,
                   'exclusive': False}
        options.update(kwargs)
        
        # 获取交换器的名称,默认值为“openstack”;
        # get_control_exchange:获取配置参数control_exchange;
        exchange_name = rpc_amqp.get_control_exchange(conf)
        
        super(TopicPublisher, self).__init__(channel,
                                             exchange_name,
                                             topic,
                                             type='topic',
                                             **options)

可以看见,类TopicPublisher的初始化过程中,获取了交换器的名称,并且对其父类Publisher进行了进一步的初始化操作。

class Publisher(object):
    """
    消息生产者基类;
    """

    def __init__(self, channel, exchange_name, routing_key, **kwargs):
        """
        初始化类Publisher;
        """
        # 交换器名称,默认值为“openstack”;
        self.exchange_name = exchange_name
        self.routing_key = routing_key
        self.kwargs = kwargs
        # 重新建立连接后,重新建立生产者;
        self.reconnect(channel)
来看方法reconnect,在这个方法中,实现了交换器的声明,以及队列的绑定等重要的功能。

def reconnect(self, channel):
    """
    重新建立连接后,重新建立生产者;
    """        
    # 交换器的声明和绑定;
    self.exchange = kombu.entity.Exchange(name=self.exchange_name, **self.kwargs)
    
    self.producer = kombu.messaging.Producer(exchange=self.exchange, channel=channel, routing_key=self.routing_key)
在方法reconnect中,先来看语句self.exchange = kombu.entity.Exchange(name=self.exchange_name, **self.kwargs),这条语句实现了根据指定的参数进行交换器的声明。具体实现来看类Exchange的初始化方法:

class Exchange(MaybeChannelBound):
    """
    交换器声明;
    """
    # 短暂性消息传递方式;
    TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
    # 持久性消息传递方式;
    PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE

    # 交换器名称;
    name = ""
    # 如果交换器类型没有指定,则默认为直接型交换器;
    type = "direct"
    # 默认交换器是持久类型的;
    durable = True
    auto_delete = False
    # 持久性消息传递方式;
    delivery_mode = PERSISTENT_DELIVERY_MODE

    attrs = (("name", None),
             ("type", None),
             ("arguments", None),
             ("durable", bool),
             ("auto_delete", bool),
             ("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m))

    def __init__(self, name="", type="", channel=None, **kwargs):
        super(Exchange, self).__init__(**kwargs)
        self.name = name or self.name
        self.type = type or self.type
        # maybe_bind:如果没有绑定,则绑定实例到channel;
        self.maybe_bind(channel)
def maybe_bind(self, channel):
        """
        如果没有绑定,则绑定实例到channel;
        """
        if not self.is_bound and channel:
            self._channel = channel
            self.when_bound()
            self._is_bound = True
        return self
在类Exchange的初始化方法我们可以知道,这个类的初始化方法实现了声明一个指定名称指定类型的交换器,在这段代码中,交换器的名称为“openstack”,交换器的类型为主题式交换器。
这里也已经实现了前面所提到的步骤中获得channel和创建交换器两个步骤。

我们再回到方法reconnect中,解析语句self.producer = kombu.messaging.Producer(exchange=self.exchange, channel=channel, routing_key=self.routing_key),根据指定exchange与routing_key来创建Producer的功能,也就是前面所提到的步骤中的第(4)步。具体我们来看类Producer的初始化方法的代码实现:

class Producer(object):
    """
    消息生产者;
    """
    # 所使用的channel;
    channel = None
    # 默认的交换器;
    exchange = None
    # 默认的路由key;
    routing_key = ""
    # 所使用的默认的序列化格式;默认为JSON;
    serializer = None
    # 默认的压缩方法,默认为禁用;
    compression = None
    auto_declare = True
    on_return = None

    def __init__(self, channel, exchange=None, routing_key=None,
            serializer=None, auto_declare=None, compression=None,
            on_return=None):
        from kombu.connection import BrokerConnection
        if isinstance(channel, BrokerConnection):
            channel = channel.default_channel
        self.channel = channel
        self.connection = self.channel.connection.client

        self.exchange = exchange or self.exchange
        if self.exchange is None:
            self.exchange = Exchange("")
        self.routing_key = routing_key or self.routing_key
        self.serializer = serializer or self.serializer
        self.compression = compression or self.compression
        self.on_return = on_return or self.on_return
        if auto_declare is not None:
            self.auto_declare = auto_declare

        self.exchange = self.exchange(self.channel)
        if self.auto_declare:
            self.declare()

        if self.on_return:
            self.channel.events["basic_return"].append(self.on_return)
我们可以看到,其实类Producer的初始化方法中主要就是进行了一些变量的初始化赋值操作,重要的就是交换器exchange和routing_key的确定。其中,如果系统定义了要自动声明交换器,则要调用方法declare()来实现对交换器的验证操作,具体就不进行展开了。
至此,我们已经实现了前面所提到的步骤中的第(4)步,即根据指定exchange与routing_key来创建Producer。

2.语句publisher.send(msg, timeout)的解析

我们再回到方法publisher_send中,对语句publisher.send(msg, timeout)进行解析。这条语句实现了将指定消息发送到指定的队列中。

先来看方法send的代码实现:

def send(self, msg, timeout=None):
        """
        发送一个信息,即给定的msg;
                     
        # msg:序列化后的msg消息;
        # timeout=None 定义了等待程序响应时间期限;
        """
        
        # 如果定义了timeout值,则在publish头部定义ttl的值;
        # 发送信息;
        if timeout:
            # AMQP TTL以毫秒为单位,它定义在publish头部分;
            #注:TTL是Time-To-Live的缩写,为消息生存时间的意思,具体值由timeout来确定;
            self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
        else:
            self.producer.publish(msg)
def publish(self, body, routing_key=None, delivery_mode=None,
            mandatory=False, immediate=False, priority=0, content_type=None,
            content_encoding=None, serializer=None, headers=None,
            compression=None, exchange=None, **properties):
        """
        发布消息到指定的队列(与相应的交换器进行绑定);
        """
        headers = headers or {}
        if routing_key is None:
            routing_key = self.routing_key
        if compression is None:
            compression = self.compression

        if isinstance(exchange, Exchange):
            exchange = exchange.name

        # 准备用于存储要发送消息的数据结构;
        body, content_type, content_encoding = self._prepare(
                body, serializer, content_type, content_encoding,
                compression, headers)
        # 建立消息实例用于发送信息;
        message = self.exchange.Message(body,
                                        delivery_mode,
                                        priority,
                                        content_type,
                                        content_encoding,
                                        headers=headers,
                                        properties=properties)
        # 实现消息的发送;
        return self.exchange.publish(message, routing_key, mandatory,
                                     immediate, exchange=exchange)
我们可以看到在方法publish中主要实现了准备用于发送消息的数据结构,把要发送的消息存储到相应的数据结构中,最后实现消息的发送。

我们直接来看类exchange下的方法Message的具体实现代码:

def Message(self, body, delivery_mode=None, priority=None,
            content_type=None, content_encoding=None, properties=None,
            headers=None):
        """
        建立消息实例用于发送信息;
        """
        properties = properties or {}
        delivery_mode = delivery_mode or self.delivery_mode
        properties["delivery_mode"] = DELIVERY_MODES.get(delivery_mode,
                                                         delivery_mode)
        return self.channel.prepare_message(body,
                                            properties=properties,
                                            priority=priority,
                                            content_type=content_type,
                                            content_encoding=content_encoding,
                                            headers=headers)
def prepare_message(self, message_data, priority=None,
            content_type=None, content_encoding=None, headers=None,
            properties=None):
        """Prepare message data."""
        properties = properties or {}
        info = properties.setdefault("delivery_info", {})
        info["priority"] = priority or 0

        return {"body": message_data,
                "content-encoding": content_encoding,
                "content-type": content_type,
                "headers": headers or {},
                "properties": properties or {}}
我们再来看方法publish中的语句self.exchange.publish(message, routing_key, mandatory, immediate, exchange=exchange),看看如何实现消息的发送操作。具体来看方法def publish(self, message, routing_key=None,
mandatory=False, immediate=False, exchange=None)的具体代码实现:

def publish(self, message, routing_key=None, mandatory=False,
            immediate=False, exchange=None):
        """
        发布消息;
        """
        exchange = exchange or self.name
        return self.channel.basic_publish(message,
                                          exchange=exchange,
                                          routing_key=routing_key,
                                          mandatory=mandatory,
                                          immediate=immediate)
def basic_publish(self, message, exchange, routing_key, **kwargs):
    """
    发布消息;
    """
    props = message["properties"]
    message["body"], props["body_encoding"] = \
                self.encode_body(message["body"], self.body_encoding)
    props["delivery_info"]["exchange"] = exchange
    props["delivery_info"]["routing_key"] = routing_key
    props["delivery_tag"] = self._next_delivery_tag()
    self.typeof(exchange).deliver(message, exchange, routing_key, **kwargs)
这里方法basic_publish实现了发送消息到指定的交换器和routing_key所决定的队列中去,具体是由方法deliver所实现的,由于交换器的不同,三种交换器共实现了三个deliver方法,分别处于类FanoutExchange、TopicExchange和DirectExchange之下。由于这里实现的交换器是主题式交换器,所以这里执行的是类TopicExchange之下的deliver方法,具体来看代码:

def deliver(self, message, exchange, routing_key, **kwargs):
    _lookup = self.channel._lookup
    _put = self.channel._put
    deadletter = self.channel.deadletter_queue
    for queue in [q for q in _lookup(exchange, routing_key)
        if q and q != deadletter]:
            _put(queue, message, **kwargs)
这个方法调用了方法_lookup实现了寻找到所有满足`routing_key`与给定`exchange`相匹配的队列;再调用方法_put实现了把要发送的消息插入到相应合适的队列之中。
我们先来看方法_lookup的代码实现:

def _lookup(self, exchange, routing_key, default=None):
    """
    寻找到所有满足`routing_key`与给定`exchange`相匹配的队列;
    """
    if default is None:
        default = self.deadletter_queue
    try:
        return self.typeof(exchange).lookup(self.get_table(exchange),
                                            exchange, routing_key, default)
    except KeyError:
        self._new_queue(default)
        return [default]
可以看到这里又根据几种不同的交换器,调用相应的lookup方法具体实现满足条件队列的搜索。而且,如果没有找到合适的队列,将会抛出异常,并调用方法新建队列用于存储消息。之所以不同的交换器有不同的lookup方法,是因为不同类型的交换器,有不同的匹配算法(这个在之前的一篇博文中已经说过)。因为这里是主题式交换器,所以调用相应的lookup方法,具体来看代码:

def lookup(self, table, exchange, routing_key, default):
        return [queue for rkey, pattern, queue in table
                        if self._match(pattern, routing_key)] or [default]
搜索到所有的合适的队列之后,就要实现把消息插入队列,这里我们回到方法def deliver(self, message, exchange, routing_key, **kwargs),对语句_put(queue, message, **kwargs)进行解析。我们来看方法_put的代码实现:

def _put(self, queue, message, **kwargs):
        self._queue_for(queue).put(message)
def put(self, item, block=True, timeout=None):
    """
    Put an item into the queue.
    """
    self.not_full.acquire()
    try:
        if self.maxsize > 0:
            if not block:
                if self._qsize() == self.maxsize:
                    raise Full
            elif timeout is None:
                while self._qsize() == self.maxsize:
                    self.not_full.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a positive number")
            else:
                endtime = _time() + timeout
                while self._qsize() == self.maxsize:
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Full
                    self.not_full.wait(remaining)
        self._put(item)
        self.unfinished_tasks += 1
        self.not_empty.notify()
    finally:
        self.not_full.release()
def _put(self, item):
        self.queue.append(item)

至此,实现了把消息插入到合适的队列之中,最终实现了基于AMQP框架下的消息的发送端操作。当然,这里还有很多细节问题,我会在代码分析完成之后进行一些总结。

之前有个朋友提到了,Nova之中是怎样通过AMQP协议进行消息接受并执行的,我将会在下一篇博文中进行解析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: