您的位置:首页 > 运维架构 > 网站架构

OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二

2013-10-02 14:28 1171 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。

博客地址:http://blog.csdn.net/gaoxingnengjisuan

邮箱地址:dong.liu@siat.ac.cn

此片博文继续上一篇博文的工作,继续对/nova/server.py中类Service下的方法def start(self)进行解析,来实现对Nova下的AMQP的消息消费者机制的解析工作。

2.3 语句self.conn.consume_in_thread()的解析

这条语句实现了从队列中获取消息,并最终实现了对消息的处理和执行操作。

def consume_in_thread(self):
    """
    启动消费者线程;
    处理一个绿色线程中的所有的队列/消费者信息;
    """
    def _consumer_thread():
        try:
            self.consume()
        except greenlet.GreenletExit:
            return
    # 启动一个绿色线程来运行方法_consumer_thread;
    # 实现获取下一个消费者;
    if self.consumer_thread is None:
        self.consumer_thread = eventlet.spawn(_consumer_thread)
    return self.consumer_thread
这个方法中启动了一个绿色线程来运行方法_consumer_thread,进而运行方法consume,我们来看方法consume的代码实现:

def consume(self, limit=None):
    it = self.iterconsume(limit=limit)
    while True:
        try:
            it.next()
        except StopIteration:
            return
def iterconsume(self, limit=None, timeout=None):
    """
    返回一个迭代器,它实现了处理所有的队列/消费者信息;
    """

    info = {'do_consume': True}

    def _error_callback(exc):
        if isinstance(exc, socket.timeout):
            LOG.debug(_('Timed out waiting for RPC response: %s') % str(exc))
            raise rpc_common.Timeout()
        else:
            LOG.exception(_('Failed to consume message from queue: %s') %
                              str(exc))
            info['do_consume'] = True

    def _consume():
        if info['do_consume']:
            queues_head = self.consumers[:-1]
            queues_tail = self.consumers[-1]
            for queue in queues_head:
               queue.consume(nowait=True)
            queues_tail.consume(nowait=False)
            info['do_consume'] = False
        return self.connection.drain_events(timeout=timeout)

    for iteration in itertools.count(0):
        if limit and iteration >= limit:
            raise StopIteration
        yield self.ensure(_error_callback, _consume)
在这个方法中最重要的语句就是queue.consume(nowait=True)和queues_tail.consume(nowait=False),所以我们进一步来看这里的方法consume的具体实现:

def consume(self, *args, **kwargs):
    options = {'consumer_tag': self.tag}
    options['nowait'] = kwargs.get('nowait', False)
    callback = kwargs.get('callback', self.callback)
    if not callback:
        raise ValueError("No callback defined")

    def _callback(raw_message):
        message = self.channel.message_to_python(raw_message)
        try:
            # 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
            msg = rpc_common.deserialize_msg(message.payload)
            callback(msg)
        except Exception:
            LOG.exception(_("Failed to process message... skipping it."))
        finally:
            message.ack()

    self.queue.consume(*args, callback=_callback, **options)
我们可以看到这个方法最后执行了语句self.queue.consume(*args, callback=_callback, **options),可以看到这里传入的参数callback就是上面代码中的方法_callback,可以预见,在后面的方法执行过程中,最终将会调用到这个方法def _callback(raw_message),我们先记住它,稍后合适的地方再进行进一步解析。
来看语句self.queue.consume(*args, callback=_callback, **options)中的方法consume的代码实现:

def consume(self, consumer_tag='', callback=None, no_ack=None,
            nowait=False):
        """
        处理队列中的消息;
        keyword consumer_tag:消费者的唯一标识符;
        keyword no_ack:如果接受的消息没有被确认;(???)
        keyword nowait:不等待回应;
        keyword callback:每个传递消息的回调方法;
        """
        if no_ack is None:
            no_ack = self.no_ack
        return self.channel.basic_consume(queue=self.name,
                                          no_ack=no_ack,
                                          consumer_tag=consumer_tag or '',
                                          callback=callback,
                                          nowait=nowait)
进一步来看方法basic_consume的实现代码:

def basic_consume(self, queue, no_ack, callback, consumer_tag, **kwargs):
        """
        处理队列中的消息;
        """
        self._tag_to_queue[consumer_tag] = queue
        self._active_queues.append(queue)
        if queue in self.auto_delete_queues:
            self.auto_delete_queues[queue] += 1

        def _callback(raw_message):
            message = self.Message(self, raw_message)
            if not no_ack:
                self.qos.append(message, message.delivery_tag)
            return callback(message)

        self.connection._callbacks[queue] = _callback
        self._consumers.add(consumer_tag)

        self._reset_cycle()
我们可以看到这里在方法def _callback(raw_message)的执行过程的最后,终于调用了前面传进来的参数callback,也就是前面我们记住的那个方法,也就是def consume(self, *args, **kwargs)中的def _callback(raw_message),我们来看一下这个方法的具体实现代码:

def _callback(raw_message):
        message = self.channel.message_to_python(raw_message)
        try:
            # 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);
            msg = rpc_common.deserialize_msg(message.payload)
            callback(msg)
        except Exception:
            LOG.exception(_("Failed to process message... skipping it."))
        finally:
            message.ack()
我们可以看到,在这个方法中的语句callback(msg),它应该是最终处理所获得消息的实现程序,具体它是哪个方法呢,其实callback也是一层层传进来的一个参数方法,他就是上一篇博文中我提到的/nova/openstack/common/rpc/amqp.py----class ProxyCallback(_ThreadPoolWithWait)下面的方法def
__call__(self, message_data),但是是如何定位到这个方法的呢,后面我会加以说明。现在来看这个方法def __call__(self, message_data)的实现代码:

def __call__(self, message_data):
    if hasattr(local.store, 'context'):
        del local.store.context
    # 记录日志,但是一些敏感信息,像创建虚拟机时的密码等信息不会
    # 显示在日志中,面是替换为"<SANITIZED>"
    rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
    # 重复消息检测;
    # ack返回前,AMQP消费者可能会出现两次读取相同信息的异常,这个方法可以防止这样的情况出现;
    self.msg_id_cache.check_duplicate_message(message_data)
    # 从message_data中解析出上下文信息;
    ctxt = unpack_context(self.conf, message_data)
    # 从message_data中获取要执行的方法method;
    method = message_data.get('method')
    # 从message_data中获取相关参数args;
    args = message_data.get('args', {})
    # 从message_data中获取版本的相关信息;
    version = message_data.get('version', None)
    if not method:
        LOG.warn(_('no method for message: %s') % message_data)
        ctxt.reply(_('No method for message: %s') % message_data,
                   connection_pool=self.connection_pool)
        return
    # 建立一个新的绿色线程来执行方法method;
    self.pool.spawn_n(self._process_data, ctxt, version, method, args)
前面已经对获取的消息进行了反序列化操作,这里分别直接获取了消息中不同的信息。我们可以看到,在方法的最后,语句self.pool.spawn_n(self._process_data, ctxt, version, method, args)就是我们最想要的东西,它建立了一个绿色线程来执行消息中指定要运行的方法method,比如运行实例等等。
我们在回到方法def _callback(raw_message)中,看到最后执行这样一条语句:message.ack(),这条语句实现了确认获取的消息已经完成执行操作,并从队列中删除这条消息。

至此,语句self.conn.consume_in_thread()解析完成。

下面,我来解释一下,为什么前面说到的callback参数方法是如何定位到方法def __call__(self, message_data)的,当然只是简单的说明一下过程。

def consume(self, *args, **kwargs):

options = {'consumer_tag': self.tag}

options['nowait'] = kwargs.get('nowait', False)

callback = kwargs.get('callback',self.callback)

if not callback:

raise ValueError("No callback defined")

def _callback(raw_message):

message = self.channel.message_to_python(raw_message)

try:

# 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);

msg = rpc_common.deserialize_msg(message.payload)

callback(msg)

except Exception:

LOG.exception(_("Failed to process message... skipping it."))

finally:

message.ack()

self.queue.consume(*args, callback=_callback, **options)

重点看这里的语句callback(msg),这条语句实际实施了对获取消息的处理过程。这里的重点是callback到底调用的是哪个方法。定义这个方法的语句是:

callback= kwargs.get('callback',self.callback)

这里就需要引出python中字典的get用法:

get(key,default=None)

返回键值key对应的值;如果key没有在字典里,则返回default参数的值,默认为None;

所以这里callback的首选值为kwargs[callback'],如果字典kwargs中没有相应的元素,则赋值callback的值为self.callback;

我们继续来探寻这里的callback的源头:

可以看到方法defconsume(self,*args, **kwargs)所处在的类classConsumerBase(object)的初始化方法__init__中有callback的取值:

def __init__(self, channel, callback, tag, **kwargs):

"""

根据相关参数在一个amqp channel上声明产生一个队列;

"""


self.callback = callback

self.tag = str(tag)

self.kwargs = kwargs

self.queue = None

# 重连接到channel;

# 声明队列和交换器,通过routing key绑定队列到交换器;

self.reconnect(channel)

我们可以想一下,什么时候对这个类进行了类的对象初始化,对,就是在执行语句:

self.conn.create_consumer(self.topic,rpc_dispatcher, fanout=False)的过程中,有对类classConsumerBase(object)进行了对象初始化过程。下面简单回顾一下这条语句的执行过程:

self.conn.create_consumer(self.topic,rpc_dispatcher, fanout=False)

def create_consumer(self, topic, proxy, fanout=False):

"""

根据参数建立具体的消息消费者;

"""

# Connection:连接到RabbitMQ的实现类;

# get_connection_pool:获取到RabbitMQ的连接池,并返回这个连接池的对象;

# connection_cls:是一个连接类的对象;

# 获取ProxyCallback类的初始化实例对象;

proxy_cb = rpc_amqp.ProxyCallback(

self.conf, proxy,

rpc_amqp.get_connection_pool(self.conf, Connection))


self.proxy_callbacks.append(proxy_cb)

if fanout:

# 建立一个广播类型的消费者;

# 声明队列和交换器,通过routing key绑定队列到交换器;

self.declare_fanout_consumer(topic, proxy_cb)

else:

# declare_topic_consumer:建立一个主题式的信息消费者;

# 声明队列和交换器,通过routing key绑定队列到交换器;

self.declare_topic_consumer(topic, proxy_cb)

def declare_topic_consumer(self, topic, callback=None, queue_name=None,

exchange_name=None):

"""

建立一个主题式的信息消费者;

声明队列和交换器,通过routing key绑定队列到交换器;

"""



# 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;

self.declare_consumer(functools.partial(TopicConsumer,

name=queue_name,

exchange_name=exchange_name,

),

topic, callback)

这里可以看到传入方法declare_topic_consumer中的callback的参数值即为proxy_cb,所以这里分两条线,一条线来看看proxy_cb的获取过程,另一条线来看看方法declare_topic_consumer中的callback是不是我们想找到的那个callback。

首先来看第一条线,我们来看语句:

proxy_cb = rpc_amqp.ProxyCallback(

self.conf, proxy,

rpc_amqp.get_connection_pool(self.conf, Connection))

class ProxyCallback(_ThreadPoolWithWait):

def __init__(self, conf, proxy, connection_pool):

# 这个类实现了启动一个用于处理传入信息的绿色线程;

super(ProxyCallback, self).__init__(

conf=conf,

connection_pool=connection_pool,

)

self.proxy = proxy

self.msg_id_cache = _MsgIdCache()

def __call__(self, message_data):

if hasattr(local.store, 'context'):

del local.store.context

rpc_common._safe_log(LOG.debug, _('received %s'), message_data)

self.msg_id_cache.check_duplicate_message(message_data)

ctxt = unpack_context(self.conf, message_data)

method = message_data.get('method')

args = message_data.get('args', {})

version = message_data.get('version', None)

if not method:

LOG.warn(_('no method for message: %s') % message_data)

ctxt.reply(_('No method for message: %s') % message_data,

connection_pool=self.connection_pool)

return

self.pool.spawn_n(self._process_data, ctxt, version, method, args)

class _ThreadPoolWithWait(object):

"""

这个类实现了启动一个用于处理传入信息的绿色线程;

"""

def __init__(self, conf, connection_pool):

self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)

self.connection_pool = connection_pool

self.conf = conf

我们可以看到proxy_cb是类ProxyCallback的实例化对象,我们还注意到类 ProxyCallback中实现了一个方法__call__,这个方法就是我们所要找到的目标方法,我们继续解析定位过程。

再来看另一条线,来追踪方法declare_topic_consumer中的callback:

def declare_topic_consumer(self, topic, callback=None, queue_name=None,

exchange_name=None):

"""

建立一个主题式的信息消费者;

声明队列和交换器,通过routing key绑定队列到交换器;

"""

# 根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;

self.declare_consumer(functools.partial(TopicConsumer,

name=queue_name,

exchange_name=exchange_name,

),

topic, callback)

def declare_consumer(self, consumer_cls, topic, callback):

"""

根据传入的类建立信息消费者,把建立好的消费者对象加入到consumers列表中;

"""

def _connect_error(exc):

log_info = {'topic': topic, 'err_str': str(exc)}

LOG.error(_("Failed to declare consumer for topic '%(topic)s': "

"%(err_str)s") % log_info)

def _declare_consumer():

# 传递进来的类consumer_cls应该是TopicConsumer;

# 声明队列和交换器,通过routing key绑定队列到交换器;

consumer = consumer_cls(self.conf, self.channel, topic,
callback,

self.consumer_num.next())

self.consumers.append(consumer)

return consumer

return self.ensure(_connect_error, _declare_consumer)

class TopicConsumer(ConsumerBase):

"""

主题式消息消费者类;

"""

def __init__(self, conf, channel, topic, callback, tag, name=None,

exchange_name=None, **kwargs):

"""

Init a 'topic' queue.

"""

# 默认选项的设置;



# rabbit_durable_queues:这个参数定义了在RabbitMQ中是否使用持久性的队列;

# 参数的默认值为False;

options = {'durable': conf.rabbit_durable_queues,

'queue_arguments': _get_queue_arguments(conf),

'auto_delete': False,

'exclusive': False}

options.update(kwargs)



# 获取交换器的名称;

exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)

# 根据相关的配置参数声明产生一个交换器;

exchange = kombu.entity.Exchange(name=exchange_name,

type='topic',

durable=options['durable'],

auto_delete=options['auto_delete'])

# 重连接到channel;

# 声明队列和交换器,通过routing key绑定队列到交换器;

super(TopicConsumer, self).__init__(channel,

callback,

tag,

name=name or topic,

exchange=exchange,

routing_key=topic,

**options)

class ConsumerBase(object):

"""

Consumer(消费者)基类;

"""

def __init__(self, channel, callback, tag, **kwargs):

"""

根据相关参数在一个amqp channel上声明产生一个队列;

"""

self.callback = callback

self.tag = str(tag)

self.kwargs = kwargs

self.queue = None

# 重连接到channel;

# 声明队列和交换器,通过routing key绑定队列到交换器;

self.reconnect(channel)

至此我们可以看到,前面提到的参数proxy_cb,即类ProxyCallback的实例化对象就是我们要找的callback的源头。

前面我们看到类ProxyCallback中除了类的初始化方法__init__之外,还有实现了一个方法__call__,这里我们来看看python中所规定的方法__call__的用法。

Python中有一个有趣的语法,只要定义类的时候,实现__call__函数,这个类就成为可调用的。换句话说,我们可以把这个类的对象当作方法来使用,相当于重载了括号运算符。例如:

class Aclass:

def __call__(self):

print 'Hi I am __call__ed';

def __init__(self, *args, **keyargs):

print "Hi I am __init__ed";

x = Aclass() 输出 Hi I am __init__ed

x() 输出 Hi I am __call__ed

现在我们回到最前面的方法defconsume(self, *args, **kwargs),来看语句callback(msg):

def consume(self, *args, **kwargs):

options = {'consumer_tag': self.tag}

options['nowait'] = kwargs.get('nowait', False)

callback = kwargs.get('callback', self.callback)

if not callback:

raise ValueError("No callback defined")

def _callback(raw_message):

message = self.channel.message_to_python(raw_message)

try:

# 这里将获取的消息进行反序列化(相对于消息发送前进行的序列化操作);

msg = rpc_common.deserialize_msg(message.payload)

callback(msg)

except Exception:

LOG.exception(_("Failed to process message... skipping it."))

finally:

message.ack()

现在我们知道callback的值为self.callback,而self.callback的值为proxy_cb,即类ProxyCallback的实例化对象。而类ProxyCallback中实现了方法__call__,根据前面我们提到的相关的python语法可以知道,这里callback执行的就是方法__call__,具体就是__call__(msg)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: