您的位置:首页 > 其它

oslo.messaging组件的学习之call方法

2016-03-06 14:39 337 查看
这篇文章会介绍一下oslo.messaging组件的实现原理。相关的学习网站是: http://docs.openstack.org/developer/oslo.messaging/index.html http://docs.openstack.org/developer/nova/devref/rpc.html http://lynnkong.iteye.com/blog/1699299 https://wiki.openstack.org/wiki/Oslo/Messaging http://blog.csdn.net/gaoxingnengjisuan/article/details/11468061 另外可以看下这里,有对AMQP的一些学习。
1.简介

nova使用了direct,fanout,topic-based这三种exchange(好像也就这三种),而和其它组件(如Object Store)则是通过RestFUL API沟通。虽然现在的nova使用的RabbitMQ或QPID都是AMQP的,但以后可能会引入其它非AMQP的组件。
nova有两个rpc的方法:rpc.call和rpc.cast。都是基于AMQP实现的。rpc.call是同步的,会等待result返回。rpc.cast是异步的,并且不需要result。

nova还有一个notify的方法。
nova使用了kombu来做RabbitMQ消息的具体底层交互模块。关于kombu可以看这里。
由于支持多种MQ,所以这里的一个框架就是注册driver,具体调用的时候调用具体driver的特定方法。所以我们分析下rabbitmq这个方法就能知道其具体有哪些对象以及对象
2.RabbitMQ实现的基本对象

首先根据kombu的那个文章,我们知道kombu中有两个对象producer和consumer,前者向exchange发送message,后者从queue取出message。oslo这里的rabbitmq实现会的对这些东西做个封装(这一点很重要,记住oslo就是做了个封装,要看原理去看kombu的,否则还是不能很好的理解其实现)。
具体的对象直接的关系如下(不怎么会画UML。。。)




3.rpc的实现例子分析(call方法)

先来看看用法,我们拿nova-api这里的例子来说这个。下面的几行首先是初始化一个用于rpc的client,看下其实现:

[nova/network/rpcapi.py]

1
2
3
4
5
6
7
8
def
__init__(
self
,
topic
=
None
):

super
(NetworkAPI,
self
).__init__()

topic
=
topic
or
CONF.network_topic

target
=
messaging.Target(topic
=
topic,
version
=
'1.0'
)

version_cap
=
self
.VERSION_ALIASES.get(CONF.upgrade_levels.network,

   
CONF.upgrade_levels.network)

serializer
=
objects_base.NovaObjectSerializer()

self
.client
=
rpc.get_client(target,
version_cap,serializer)


首先是获取一个topic,这个topic就是network的consumer监听的queue的binding-key,从这个图可以看到:




在这里,是这个东西:

1
2
#
The topicnetwork nodes listen on (string value)

#network_topic=network


然后是获取一个target,我们看下这个target是啥,代码不长,我就都复制下来了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class
Target(
object
):


"""Identifies
the destination of messages.


A
Target encapsulates all the information to identify where amessage

should
be sent or what messages aserver is listening for.


Different
subsets of the information encapsulated in aTarget object is

relevant
to various aspects of the API:


  
creating
aserver:

topic
and server is required; exchange is optional

  
an
endpoint's target:

namespace
and version is optional

  
client
sendingamessage:

topic
is required,all other attributes optional


Its
attributes are:


:param
exchange:A scope for topics. Leave unspecified to default to the

  
control_exchange
configuration option.

:type
exchange:str

:param
topic:A name which identifies the set of interfaces exposed by a

  
server.
Multiple servers may listen on atopicand messages will be

  
dispatched
to one of the servers in around-robin fashion.

:type
topic:str

:param
namespace:Identifies aparticular interface (i.e. set of methods)

  
exposed
by aserver. The default interface has no namespace identifier

  
and
is referred to as the null namespace.

:type
namespace:str

:param
version:Interfaces have amajor.minor version number associated

  
with
them. A minor number increment indicates abackwards compatible

  
change
and an incompatible change is indicated by amajor number bump.

  
Servers
may implement multiple major versions and clients may require

  
indicate
that their message requires aparticular minimum minor version.

:type
version:str

:param
server:Clients can request that amessage be directed to aspecific

  
server,
rather than just one of apool of servers listening on the topic.

:type
server:str

:param
fanout:Clients may request that amessage be directed to all

  
servers
listening on atopicby setting fanout to ``True``,rather than

  
just
one of them.

:type
fanout:bool

"""


def
__init__(
self
,
exchange
=
None
,
topic
=
None
,
namespace
=
None
,

 
version
=
None
,
server
=
None
,
fanout
=
None
):

self
.exchange
=
exchange

self
.topic
=
topic

self
.namespace
=
namespace

self
.version
=
version

self
.server
=
server

self
.fanout
=
fanout


def
__call__(
self
,
*
*
kwargs):

kwargs.setdefault(
'exchange'
,
self
.exchange)

kwargs.setdefault(
'topic'
,
self
.topic)

kwargs.setdefault(
'namespace'
,
self
.namespace)

kwargs.setdefault(
'version'
,
self
.version)

kwargs.setdefault(
'server'
,
self
.server)

kwargs.setdefault(
'fanout'
,
self
.fanout)

return
Target(
*
*
kwargs)


def
__eq__(
self
,
other):

return
vars
(
self
)
=
=
vars
(other)


def
__ne__(
self
,
other):

return
not
self
=
=
other


def
__repr__(
self
):

attrs
=
[]

for
a
in
[
'exchange'
,
'topic'
,
'namespace'
,

  
'version'
,
'server'
,
'fanout'
]:

v
=
getattr
(
self
,
a)

if
v:

attrs.append((a,
v))

values
=
',
'
.join([
'%s=%s'
%
i
for
i
in
attrs])

return
'<Target
'
+
values
+
'>'


从注释可以看到,target有两种含义,对于发送者来说,其表示message应该发送到哪里。对于接受者来说,其表示应该接收何种消息。看下构造函数的几个方法:

exchange:用于指定exchange,如果没有指定,那么就是配置文件里的control_exchange

topic:就是binding key。如果多个consumer(在注释里也叫server)监听同一个queue,那么会使用round-robin来发配消息

namespace:某些方法会用到这个,默认是空,具体作用我们后面如果遇到了再看

version:consumer(也就是server)是由版本号的,发送的message的版本要和consumer的版本兼容才行。版本号是major.mirror的形式。major号相同则兼容

server:特定的consumer。相当于direct的转发了,就是发送者指定特定的consumer,然后由它处理,而不是像topic里的解释那样,用round-robin来分发消息

fanout:表明消息会广播,忽略topic,只要consumer允许接收fanout消息即可
在我们的例子里,exchange使用默认的exchange(从这里我们可以直达,其名字是nova)。topic就是network(配置文件里都会写清楚topic的名字是什么,看下那些XXX_topic的选项就可以了),version就是1.0。所以我们可以知道,现在或之后,broker上应该有一个叫做nova的exchange,并且有至少有一个queue绑定在上面,绑定的key是network。
我们继续看下面的代码,这一行就开始获取一个client了,并且由于知道了target,所以我们也知道这个client的信息会发往何处。

1
self
.client
=
rpc.get_client(target,
version_cap,serializer)


1
2
3
4
5
6
7
def
get_client(target,
version_cap
=
None
,
serializer
=
None
):

assert
TRANSPORT
is
not
None

serializer
=
RequestContextSerializer(serializer)

return
messaging.RPCClient(TRANSPORT,

   
target,

   
version_cap
=
version_cap,

   
serializer
=
serializer)


TRANSPORT的信息可以看这里,serializer可以看这里,总之前者是发送消息的具体driver,后者是个用于格式化消息格式的东西。重点在于RPCClient,我们来看下这个。代码很长,所以我们慢慢看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class
RPCClient(
object
):


"""A
class for invoking methods on remote servers.


The
RPCClient class is responsible for sendingmethod invocations to remote

servers
viaamessaging transport.


A
default targetis supplied to the RPCClient constructor,but target

attributes
can be overridden for individual method invocations using the

prepare()
method.


A
method invocation consists of arequest contextdictionary,amethod name

and
adictionary of arguments. A cast()invocation just sends the request

and
returns immediately. A call()invocation waits for the server to send

a
return value.


This
class is intended to be used by wrapping it in another class which

provides
methods on the subclass to perform the remote invocation using

call()
or cast()::


class
TestClient(object):


def
__init__(self,transport):

target
= messaging.Target(topic='testtopic',version='2.0')

self._client
= messaging.RPCClient(transport,target)


def
test(self,ctxt,arg):

return
self._client.call(ctxt,'test',arg=arg)


An
example of using the prepare()method to override some attributes of the

default
target::


def
test(self,ctxt,arg):

cctxt
= self._client.prepare(version='2.5')

return
cctxt.call(ctxt,'test',arg=arg)


RPCClient
have anumber of other properties - for example,timeoutand

version_cap
- which may make sense to override for some method invocations,

so
they too can be passed to prepare()::


def
test(self,ctxt,arg):

cctxt
= self._client.prepare(timeout=10)

return
cctxt.call(ctxt,'test',arg=arg)


However,
this class can be used directly without wrapping it another class.

For
example::


transport
= messaging.get_transport(cfg.CONF)

target
= messaging.Target(topic='testtopic',version='2.0')

client
= messaging.RPCClient(transport,target)

client.call(ctxt,
'test',arg=arg)


but
this is probably only useful in limited circumstances as awrapper

class
will usually help to make the code much more obvious.

"""


def
__init__(
self
,
transport,target,

 
timeout
=
None
,
version_cap
=
None
,
serializer
=
None
):

"""Construct
an RPC client.


:param
transport:amessaging transport handle

:type
transport:Transport

:param
target:the default targetfor invocations

:type
target:Target

:param
timeout:an optional default timeout(in seconds)for call()s

:type
timeout:int or float

:param
version_cap:raise aRPCVersionCapError version exceeds this cap

:type
version_cap:str

:param
serializer:an optional entity serializer

:type
serializer:Serializer

"""

self
.conf
=
transport.conf

self
.conf.register_opts(_client_opts)


self
.transport
=
transport

self
.target
=
target

self
.timeout
=
timeout

self
.version_cap
=
version_cap

self
.serializer
=
serializer
or
msg_serializer.NoOpSerializer()


super
(RPCClient,
self
).__init__()


_marker
=
_CallContext._marker


小秦我最喜欢贴注释,所以这次还是老规矩,注释都贴上来了。注释写的很清楚,这个东西用来发消息,发到哪里呢?发到target。怎么个发法呢?通过rpc发。rpc是啥呢?这个就百度去啦。

构造函数没什么可讲的,就是赋值。所以我们继续看下面的代码,看下怎么调用rpc的call方法吧(再复习一下,call是同步的,需要等结果返回)。我们挑一个看看:

1
2
3
def
get_fixed_ip_by_address(
self
,
ctxt,address):

return
self
.client.call(ctxt,
'get_fixed_ip_by_address'
,

address
=
address)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def
call(
self
,
ctxt,method,
*
*
kwargs):

"""Invoke
amethod and wait for areply.


Method
arguments must either be primitive types or types supported by

the
client's serializer(if any). Similarly,the request contextmust

be
adict unless the client's serializersupports serializing another

type.


The
semantics of how any errors raised by the remote RPC endpoint

method
are handled are quite subtle.


Firstly,
if the remote exception is contained in one of the modules

listed
in the allow_remote_exmods messaging.get_transport()parameter,

then
it this exception will be re-raised by call(). However,such

locally
re-raised remote exceptions are distinguishable from the same

exception
type raised locally because re-raised remote exceptions are

modified
such that their class name ends withthe '_Remote' suffix so

you
may do::


if
ex.__class__.__name__.endswith('_Remote'):

#
Some special case for locally re-raised remote exceptions


Secondly,
if aremote exception is not from amodule listed in the

allowed_remote_exmods
list,then amessaging.RemoteError exception is

raised
withall details of the remote exception.


:param
ctxt:arequest contextdict

:type
ctxt:dict

:param
method:the method name

:type
method:str

:param
kwargs:adict of method arguments

:type
kwargs:dict

:raises:
MessagingTimeout,RemoteError

"""

return
self
.prepare().call(ctxt,
method,
*
*
kwargs)


注释说了一大堆,主要是关于异常的。干的事情很简单啦,调用prepare生成的对象的call方法,参数中ctxt是restful请求的request的上下文(就把它看成是request好了),method是调用的远程对象要执行的方法,kwargs是传递个method的方法。我们看下prepare是啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def
prepare(
self
,
exchange
=
_marker,
topic
=
_marker,
namespace
=
_marker,

version
=
_marker,
server
=
_marker,
fanout
=
_marker,

timeout
=
_marker,
version_cap
=
_marker):

"""Prepare
amethod invocation context.


Use
this method to override client properties for an individual method

invocation.
For example::


def
test(self,ctxt,arg):

cctxt
= self.prepare(version='2.5')

return
cctxt.call(ctxt,'test',arg=arg)


:param
exchange:see Target.exchange

:type
exchange:str

:param
topic:see Target.topic

:type
topic:str

:param
namespace:see Target.namespace

:type
namespace:str

:param
version:requirement the server must support,see Target.version

:type
version:str

:param
server:send to aspecific server,see Target.server

:type
server:str

:param
fanout:send to all servers on topic,see Target.fanout

:type
fanout:bool

:param
timeout:an optional default timeout(in seconds)for call()s

:type
timeout:int or float

:param
version_cap:raise aRPCVersionCapError version exceeds this cap

:type
version_cap:str

"""

return
_CallContext._prepare(
self
,

 
exchange,
topic,namespace,

 
version,
server,fanout,

 
timeout,
version_cap)


继续看_CallContext,从注释我们知道,prepare就是为了准备一个rpc的上下文:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@classmethod

def
_prepare(
cls
,
base,

 
exchange
=
_marker,
topic
=
_marker,
namespace
=
_marker,

 
version
=
_marker,
server
=
_marker,
fanout
=
_marker,

 
timeout
=
_marker,
version_cap
=
_marker):

"""Prepare
amethod invocation context. See RPCClient.prepare()."""

kwargs
=
dict
(

exchange
=
exchange,

topic
=
topic,

namespace
=
namespace,

version
=
version,

server
=
server,

fanout
=
fanout)

kwargs
=
dict
([(k,
v)
for
k,
v
in
kwargs.items()

   
if
v
is
not
cls
._marker])

target
=
base.target(
*
*
kwargs)


if
timeout
is
cls
._marker:

timeout
=
base.timeout

if
version_cap
is
cls
._marker:

version_cap
=
base.version_cap


return
_CallContext(base.transport,
target,

base.serializer,

timeout,
version_cap)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class
_CallContext(
object
):


_marker
=
object
()


def
__init__(
self
,
transport,target,serializer,

 
timeout
=
None
,
version_cap
=
None
):

self
.conf
=
transport.conf


self
.transport
=
transport

self
.target
=
target

self
.serializer
=
serializer

self
.timeout
=
timeout

self
.version_cap
=
version_cap


super
(_CallContext,
self
).__init__()


可以看到,这也不过是对一些对象在合并一下(所以说,context是个很简单的东西,就是一大堆杂七杂八的大集合)。

会过去看call吧,call会的最终调用这个上下文的call方法,也就是下面的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def
call(
self
,
ctxt,method,
*
*
kwargs):

"""Invoke
amethod and wait for areply. See RPCClient.call()."""

msg
=
self
._make_message(ctxt,
method,kwargs)

msg_ctxt
=
self
.serializer.serialize_context(ctxt)


timeout
=
self
.timeout

if
self
.timeout
is
None
:

timeout
=
self
.conf.rpc_response_timeout


if
self
.version_cap:

self
._check_version_cap(msg.get(
'version'
))


try
:

result
=
self
.transport._send(
self
.target,
msg_ctxt,msg,

  
wait_for_reply
=
True
,
timeout
=
timeout)

except
driver_base.TransportDriverError
as ex:

raise
ClientSendError(
self
.target,
ex)

return
self
.serializer.deserialize_entity(ctxt,
result)


ok了,我们快要接近call的核心了。一开始的两个主要是为了格式化一下消息的格式,然后附加点内容,不是重点。然后是有关于超时的,这个也可以不看。然后会检查一下version,这个也暂时不看。最后的这个就是我们的send了,这个我们要好好看看:

1
2
3
4
5
try
:

result
=
self
.transport._send(
self
.target,
msg_ctxt,msg,

  
wait_for_reply
=
True
,
timeout
=
timeout)

except
driver_base.TransportDriverError
as ex:

raise
ClientSendError(
self
.target,
ex)


transport上面提到过了,所以我们知道它里边的driver就是一个消息的具体实现。我们来看看其send方法:

1
2
3
4
5
6
7
def
_send(
self
,
target,ctxt,message,wait_for_reply
=
None
,
timeout
=
None
):

if
not
target.topic:

raise
exceptions.InvalidTarget(
'A
topicis required to send'
,

   
target)

return
self
._driver.send(target,
ctxt,message,

 
wait_for_reply
=
wait_for_reply,

 
timeout
=
timeout)


通过这里,我们很容易知道具体的_driver是什么:

1
<oslo.messaging._drivers.impl_qpid.QpidDriver
object at 0x2e38a90>


不过小秦我我们假设用的是rabbitmq,我们来看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class
RabbitDriver(amqpdriver.AMQPDriverBase):


def
__init__(
self
,
conf,url,default_exchange
=
None
,

 
allowed_remote_exmods
=
[]):

conf.register_opts(rabbit_opts)

conf.register_opts(rpc_amqp.amqp_opts)


connection_pool
=
rpc_amqp.get_connection_pool(conf,
Connection)


super
(RabbitDriver,
self
).__init__(conf,
url,

   
connection_pool,

   
default_exchange,

   
allowed_remote_exmods)


def
require_features(
self
,
requeue
=
True
):

pass


也就是说,这里的send方法其实存在在amqpdriver.AMQPDriverBase中。后者是个很重要的方法,其是所有MQ client的一个父类,提供了公共的接口。我们来看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class
AMQPDriverBase(base.BaseDriver):


def
__init__(
self
,
conf,url,connection_pool,

 
default_exchange
=
None
,
allowed_remote_exmods
=
[]):

super
(AMQPDriverBase,
self
).__init__(conf,
url,default_exchange,

 
allowed_remote_exmods)


self
._server_params
=
self
._server_params_from_url(
self
._url)


self
._default_exchange
=
default_exchange


#
FIXME(markmc):temp hack

if
self
._default_exchange:

self
.conf.set_override(
'control_exchange'
,
self
._default_exchange)


self
._connection_pool
=
connection_pool


self
._reply_q_lock
=
threading.Lock()

self
._reply_q
=
None

self
._reply_q_conn
=
None

self
._waiter
=
None


这里的初始化现在没什么好看的,主要还是赋值。我们看下send吧:

1
2
def
send(
self
,
target,ctxt,message,wait_for_reply
=
None
,
timeout
=
None
):

return
self
._send(target,
ctxt,message,wait_for_reply,timeout)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def
_send(
self
,
target,ctxt,message,

  
wait_for_reply
=
None
,
timeout
=
None
,

  
envelope
=
True
,
notify
=
False
):


#
FIXME(markmc):remove this temporary hack

class
Context(
object
):

def
__init__(
self
,
d):

self
.d
=
d


def
to_dict(
self
):

return
self
.d


context
=
Context(ctxt)

msg
=
message


if
wait_for_reply:

msg_id
=
uuid.uuid4().
hex

msg.update({
'_msg_id'
:
msg_id})

LOG.debug(
'MSG_ID
is %s'
%
(msg_id))

msg.update({
'_reply_q'
:
self
._get_reply_q()})


rpc_amqp._add_unique_id(msg)

rpc_amqp.pack_context(msg,
context)


if
envelope:

msg
=
rpc_common.serialize_msg(msg)


if
wait_for_reply:

self
._waiter.listen(msg_id)


try
:

with
self
._get_connection()
as conn:

if
notify:

conn.notify_send(target.topic,
msg)

elif
target.fanout:

conn.fanout_send(target.topic,
msg)

else
:

topic
=
target.topic

if
target.server:

topic
=
'%s.%s'
%
(target.topic,
target.server)

conn.topic_send(topic,
msg,timeout
=
timeout)


if
wait_for_reply:

result
=
self
._waiter.wait(msg_id,
timeout)

if
isinstance
(result,
Exception):

raise
result

return
result

finally
:

if
wait_for_reply:

self
._waiter.unlisten(msg_id)


终于,小秦我终于看到真真切切的send了!看之前在明确一下,这里的send会发送一个消息给exchange,然后根据routing key,exchange会转发给具体的queue,在那个queue上有人在等着拿东西。那个人拿完并处理后会返回结果给我们,所以我们也必须建立一个接收返回消息的queue连接在exchange上,并且这个queue的routing key必须放在message里告知对方,让他们知道返回值往哪里发。具体的可以看这里。

懂了上面的这些后,就开始看代码吧:

首先设置msg_id,同时生成一个返回的queue(就是我们上面讲的那个接收返回消息的queue):

1
2
3
4
5
if
wait_for_reply:

msg_id
=
uuid.uuid4().
hex

msg.update({
'_msg_id'
:
msg_id})

LOG.debug(
'MSG_ID
is %s'
%
(msg_id))

msg.update({
'_reply_q'
:
self
._get_reply_q()})


来看看这个reply queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def
_get_reply_q(
self
):

with
self
._reply_q_lock:

if
self
._reply_q
is
not
None
:

return
self
._reply_q


reply_q
=
'reply_'
+
uuid.uuid4().
hex


conn
=
self
._get_connection(pooled
=
False
)


self
._waiter
=
ReplyWaiter(
self
.conf,
reply_q,conn,

   
self
._allowed_remote_exmods)


self
._reply_q
=
reply_q

self
._reply_q_conn
=
conn


return
self
._reply_q


可以看到queue的名字就是reply_加上一个随机的东东,并且根据上面的代码,可以知道这个名字铁定会放在msg里(msg.update({‘_reply_q’:self._get_reply_q()}))。看下这个waiter吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class
ReplyWaiter(
object
):


def
__init__(
self
,
conf,reply_q,conn,allowed_remote_exmods):

self
.conf
=
conf

self
.conn
=
conn

self
.reply_q
=
reply_q

self
.allowed_remote_exmods
=
allowed_remote_exmods


self
.conn_lock
=
threading.Lock()

self
.incoming
=
[]

self
.msg_id_cache
=
rpc_amqp._MsgIdCache()

self
.waiters
=
ReplyWaiters()


conn.declare_direct_consumer(reply_q,
self
)


重点是最后一句,这里声明了一个consumer。我们知道从上面的对象图知道,有consumer就有它监听的queue,来看下这个是个什么consumer(具体的实现在impl_XXX中,小秦我这里是impl_rabbit):

1
2
3
4
5
6
def
declare_direct_consumer(
self
,
topic,callback):

"""Create
a'direct' queue.

In
nova's use,this is generally amsg_idqueue used for

responses
for call/multicall

"""

self
.declare_consumer(DirectConsumer,
topic,callback)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def
declare_consumer(
self
,
consumer_cls,topic,callback):

"""Create
aConsumer using the class that was passed in and

add
it to our list of consumers

"""


def
_connect_error(exc):

log_info
=
{
'topic'
:
topic,
'err_str'
:
str
(exc)}

LOG.error(_(
"Failed
to declare consumerfor topic'%(topic)s':"

  
"%(err_str)s"
)
%
log_info)


def
_declare_consumer():

consumer
=
consumer_cls(
self
.conf,
self
.channel,
topic,callback,

six.
next
(
self
.consumer_num))

self
.consumers.append(consumer)

return
consumer


return
self
.ensure(_connect_error,
_declare_consumer)


ok啦,我们看到,这里的reply queue就是一个DirectConsumer。这个DirectConsumer从上面的对象图中可以知道,就是对kombu的consumer一个封装(简单的说建立一个consumer就是建立一个queue,然后过会会在上面不停的监听罢了(不过这里我们还不会去监听))。
好啦,reply queue看好了,可以继续看_send方法了。现在我们知道的事情是:消息往exchang=nova的exchange发,并且topic=network。同时我建立了一个叫做reply_XXX的队列,用于接收返回值。继续看吧:

1
2
if
wait_for_reply:

self
._waiter.listen(msg_id)


这里就是监听队列了。msg_id就是监听的key。
接下来就是发送消息了:

1
2
3
4
5
6
7
8
9
10
11
try
:

with
self
._get_connection()
as conn:

if
notify:

conn.notify_send(target.topic,
msg)

elif
target.fanout:

conn.fanout_send(target.topic,
msg)

else
:

topic
=
target.topic

if
target.server:

topic
=
'%s.%s'
%
(target.topic,
target.server)

conn.topic_send(topic,
msg,timeout
=
timeout)


在我们这里就是conn.topic_send(topic,msg,timeout=timeout)了。另外可以看到,如果target明确指定了server,那么就会发送给特定的server!否则则是通过round-robin算法来分配消息。至于conn.topic_send做的事情就很简单啦,创建个Publisher然后发送,具体的可以看看博客里的kombu的那个文章(也就是说,我发消息的时候是先建立接收队列再发送消息。如果是call的调用,会生成一个Topic
Publisher和一个Direct Consumer,同时会生成一个msg_id为名字的exchange和一个msg_id为key的queue)。
最后几行就很简单了:

1
2
3
4
5
6
7
8
if
wait_for_reply:

result
=
self
._waiter.wait(msg_id,
timeout)

if
isinstance
(result,
Exception):

raise
result

return
result

finally
:

if
wait_for_reply:

self
._waiter.unlisten(msg_id)


如果需要等返回,那么我就等。如果返回的是异常,那么就抛出异常。最后别忘了取消监听就好。当然啦,如果压根就不要要等返回值的话,那么也就不用搞这么麻烦了。
最后,我们再来看看这里rpc的callback方法吧。我们知道我们调用了call后,就会在reply queue上监听,那么当消息到达后会如何处理呢?我们来看下:通过result= self._waiter.wait(msg_id,timeout),我们可以得到result,所以wait是我们的切入点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def
wait(
self
,
msg_id,timeout):

#

#
NOTE(markmc):we're waiting for areply for msg_idto come in for on

#
the reply_q,but there may be other threads also waiting for replies

#
to other msg_ids

#

#
Only one thread can be consuming from the queue using this connection

#
and we don't want to hold open aconnection per thread,so instead we

#
have the first thread take responsibility for passing replies not

#
intended for itself to the appropriate thread.

#

final_reply
=
None

while
True
:

if
self
.conn_lock.acquire(
False
):

#
Ok,we're the thread responsible for polling the connection

try
:

#
Check the queue to see if aprevious lock-holding thread

#
queued up areply already

while
True
:

reply,
ending,empty
=
self
._check_queue(msg_id)

if
empty:

break

if
not
ending:

final_reply
=
reply

else
:

return
final_reply


#
Now actually poll the connection

while
True
:

reply,
ending
=
self
._poll_connection(msg_id,
timeout)

if
not
ending:

final_reply
=
reply

else
:

return
final_reply

finally
:

self
.conn_lock.release()

#
We've got our reply,tell the other threads to wake up

#
so that one of them will take over the responsibility for

#
polling the connection

self
.waiters.wake_all(msg_id)

else
:

#
We're going to wait for the first thread to pass us our reply

reply,
ending,trylock
=
self
._poll_queue(msg_id,
timeout)

if
trylock:

#
The first thread got its reply,let's try and take over

#
the responsibility for polling

continue

if
not
ending:

final_reply
=
reply

else
:

return
final_reply


代码很长,很大一部分是由于在reply_q上可能有其它线程在等待不同的消息(原因应该是由于不希望每个线程都开一个connection,而是要开大家公用一个connection。这么说吧,如果我rpclient建立好了,这个时候弄了个多线程,那么会有多个reply_q,但根据我下面会说道的incoming列表,这些reply_q上的消息都会的放到同一个列表中,所以我们这里要做区分。这里的多线程应该是我们的协程,这个在之后的文章里会写到这个)。关键的代码是这句:

1
reply,
ending
=
self
._poll_connection(msg_id,
timeout)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def
_poll_connection(
self
,
msg_id,timeout):

while
True
:

while
self
.incoming:

message_data
=
self
.incoming.pop(
0
)


incoming_msg_id
=
message_data.pop(
'_msg_id'
,
None
)

if
incoming_msg_id
=
=
msg_id:

return
self
._process_reply(message_data)


self
.waiters.put(incoming_msg_id,
message_data)


try
:

self
.conn.consume(limit
=
1
,
timeout
=
timeout)

except
rpc_common.Timeout:

raise
messaging.MessagingTimeout(
'Timed
out waiting for a'

 
'reply
to message ID %s'

 
%
msg_id)


实现很简单,如果拿到了消息,就看下是不是我们要的那个消息(更具msg_id),如果不是,则放回reply_q(其实这里已经不是reply_q了,具体的看了下面的incoming列表就知道了),如果是,则调用self._process_reply(message_data)进行处理。而后者其实很简单,看看返回的data里有没有错,没错就返回具体的返回值就好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
def
_process_reply(
self
,
data):

result
=
None

ending
=
False

self
.msg_id_cache.check_duplicate_message(data)

if
data[
'failure'
]:

failure
=
data[
'failure'
]

result
=
rpc_common.deserialize_remote_exception(

failure,
self
.allowed_remote_exmods)

elif
data.get(
'ending'
,
False
):

ending
=
True

else
:

result
=
data[
'result'
]

return
result,
ending


那为什么返回的message会的到message_data里呢?通过之前关于kombu的文章里写的东西,我们知道当一个消息来了后consumer会的调用callback,那这个callback是什么呢?我们来看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class
ReplyWaiter(
object
):


def
__init__(
self
,
conf,reply_q,conn,allowed_remote_exmods):

self
.conf
=
conf

self
.conn
=
conn

self
.reply_q
=
reply_q

self
.allowed_remote_exmods
=
allowed_remote_exmods


self
.conn_lock
=
threading.Lock()

self
.incoming
=
[]

self
.msg_id_cache
=
rpc_amqp._MsgIdCache()

self
.waiters
=
ReplyWaiters()


conn.declare_direct_consumer(reply_q,
self
)


中的在这行代码:conn.declare_direct_consumer(reply_q,self),这里的self的__call__方法就是我们的callback,看下其实现吧:

1
2
3
def
__call__(
self
,
message):

message.acknowledge()

self
.incoming.append(message)


现在清楚啦,当消息来了后其实什么都不做,只是把它放到incoming这个list里等着之后慢慢拿出来罢了。另外这里也会发送ack消息给broker。
4.小结一下rpc的call方法
大部分服务,如果其会的监听消息(也就是扮演rpc的接收端角色),那么一般都会提供一个rpcapi.py的文件,里边有一个XXXAPI的类(如ConsoleAuthAPI,NetworkAPI),这个类代表着一个RPCClient,并且初始化的时候会的把对应的topic赋值给它(ruconsole,network)。这个RPCClient有call和cast方法,调用这个RPCClient的call方法就会往对应的topic发送信息。其它服务如果要和某个服务rpc通信,那么只需要建立一个XXXAPI的对象即可。

当第一次使用call的时候,由于需要得到返回数据,因此会的建立一个reply_XXX的exchange,还会的建立一个reply_XXX的queue,其topic就是reply_XXX。注意,只有第一次调用call的时候会建立这些东西,此后再次调用call都不会建立了,而是复用这些exchange,queue,通过msg_id来区分接收到的消息是哪个线程发送的。

RPCClient最终是调用AMQPDriverBase中的_send方法去发送消息(RPCClient拥有TRANSPORT这个属性,后者拥有AMQPDriverBase这个属性,所以一个NetworkAPI含有一个AMQPDriverBase对象),具体的发送消息的动作由TRANSPORT中的_driver实现(比如RabbitMQ、QPID)。由于RPCClient只有一个(因为XXXAPI的类只有一个),所以reply_XXX这个queue就是AMQPDriverBase的一个属性。

AMQPDriverBase有一个属性是ReplyWaiter(含义是reply的等待者,这个等待者是针对RPCClient这个大的东西来说的,类似于manager),后者有一个属性是ReplyWaiters(也是reply的等待者,但这个等待的含义只的是等待的message,或者说一个ReplyWaiters就是一个调用了call方法并在等待reply的线程的集合)。

reply_XXX队列上的Consumer是DirectConsumer,其收到消息后会的把消息放在ReplyWaiter的incoming列表中,同时发送消息的ack确认。

接收消息这个动作是在call方法调用_send后,_send调用ReplyWaiter的wait来实现的。

wait的实现很简单,首先获取一个线程锁,确保某一个时候只有一个线程可以访问reply_XXX。

如果某个线程获取了这个锁:那么其会的查看ReplyWaiter的incoming列表,在里边取出消息,然后比较消息的msg_id是不是我这个线程需要的msg_id,如果不是,那么就把这个消息放到ReplyWaiters的一个字典中,这个字典的key是msg_id,value就是消息。如果取完了incoming中的消息发现都没有这个线程需要的,那么这个这个线程就会调用DirectConsumer的consume方法,参数是(limit=1,timeout=timeout),含义是等待最多timeout秒,并且只取一个消息。如果超时,那么就报错。如果取到了,那么和上面一样判断是不是自己需要的msg,是的话就去做处理,不是的话就放到ReplyWaiters的一个字典中,让别的没有取得线程锁的线程处理。

如果某个线程没有获取这个锁,那么其会的查看ReplyWaiters的字典,看看自己需要的消息是不是已经有人从incoming中取出来并放到这里了。如果是那么就处理,如果不是那么就等一会然后再去尝试获取锁,做类似的操作。

对于消息的处理其实很简单,如果有值那么就返回,如果有错那么就报错。
比如我现在有三个线程,都调用了call方法往network组件发送了一个消息,那么这三个线程都是共用一个NetworkAPI对象的(因为线程的建立在这个对象初始化之后才调用),这个NetworkAPI对象就是一个RPCClient,后者有TRANSPORT对象,知道消息发往的目的地是它的TARGET对象,同时TRANSPORT对象有AMQPDriverBase这个对象,后者有ReplyWaiter对象,ReplyWaiter有ReplyWaiters对象。

这三个线程都调用了call方法,只有第一个call方法会的创建一个reply_XXX exchange和一个topic是reply_XXX的queue,并且这三个线程(或者说这个RPCClient上后续所有的RPC调用)都会的使用这个reply_XXX exchange和reply_XXX queue。这三个线程发送的消息分别是msg_01,msg_02,msg_03(所以reply的消息id也应该是msg_01,msg_02,msg_03)。当他们发送完后,都会调用wait方法。由于只有一个线程可以取到锁,所以这三个线程中有两个线程会去ReplyWaiters的字典中看看那个取到锁的线程有没有把消息取来放到这个字典里。而那个取到锁的线程则是去看incoming列表,从中取消息。如果取到的是自己要的消息(比如msg_01),那么就释放锁并处理消息后返回消息中的返回值。如果不是自己需要的消息那么这个线程会把这个消息(比如msg_02,并不是它想要的msg_01)放到ReplyWaiters的字典中(添加一个条目:msg_01:MSG_DATA),继续从incoming中取消息。如果incoming中已经没有消息了,那么就调用reply_XXX上的consumer的consume方法,从reply_XXX中取消息放到incoming中,然后再去做相同的操作。

几个对象的关系(不是UML。。。瞎画的。。。):
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: