oslo.messaging组件的学习之call方法
2016-03-29 00:00
295 查看
2014年4月25日 BY THUANQIN·0 COMMENT
这篇文章会介绍一下oslo.messaging组件的实现原理。相关的学习网站是: 1.简介
nova使用了direct,fanout,topic-based这三种exchange(好像也就这三种),而和其它组件(如Object Store)则是通过RestFUL API沟通。虽然现在的nova使用的RabbitMQ或QPID都是AMQP的,但以后可能会引入其它非AMQP的组件。
nova有两个rpc的方法:rpc.call和rpc.cast。都是基于AMQP实现的。rpc.call是同步的,会等待result返回。rpc.cast是异步的,并且不需要result。
nova还有一个notify的方法。
nova使用了kombu来做RabbitMQ消息的具体底层交互模块。关于kombu可以看
由于支持多种MQ,所以这里的一个框架就是注册driver,具体调用的时候调用具体driver的特定方法。所以我们分析下rabbitmq这个方法就能知道其具体有哪些对象以及对象
2.RabbitMQ实现的基本对象
首先根据kombu的那个文章,我们知道kombu中有两个对象producer和consumer,前者向exchange发送message,后者从queue取出message。oslo这里的rabbitmq实现会的对这些东西做个封装(这一点很重要,记住oslo就是做了个封装,要看原理去看kombu的,否则还是不能很好的理解其实现)。
具体的对象直接的关系如下(不怎么会画UML。。。)
3.rpc的实现例子分析(call方法)
先来看看用法,我们拿nova-api这里的例子来说这个。下面的几行首先是初始化一个用于rpc的client,看下其实现:
[nova/network/rpcapi.py]
1 2 3 4 5 6 7 8 | def __init__( self ,topic = None ): super (NetworkAPI, self ).__init__() topic = topic or CONF.network_topic target = messaging.Target(topic = topic,version = '1.0' ) version_cap = self .VERSION_ALIASES.get(CONF.upgrade_levels.network, CONF.upgrade_levels.network) serializer = objects_base.NovaObjectSerializer() self .client = rpc.get_client(target,version_cap,serializer) |
在这里,是这个东西:
1 2 | # The topicnetwork nodes listen on (string value) #network_topic=network |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | class Target( object ): """Identifies the destination of messages. A Target encapsulates all the information to identify where amessage should be sent or what messages aserver is listening for. Different subsets of the information encapsulated in aTarget object is relevant to various aspects of the API: creating aserver: topicand server is required; exchange is optional an endpoint's target: namespace and version is optional client sendingamessage: topicis required,all other attributes optional Its attributes are: :param exchange:A scope for topics. Leave unspecified to default to the control_exchange configuration option. :type exchange:str :param topic:A name which identifies the set of interfaces exposed by a server. Multiple servers may listen on atopicand messages will be dispatched to one of the servers in around-robin fashion. :type topic:str :param namespace:Identifies aparticular interface (i.e. set of methods) exposed by aserver. The default interface has no namespace identifier and is referred to as the null namespace. :type namespace:str :param version:Interfaces have amajor.minor version number associated withthem. A minor number increment indicates abackwards compatible change and an incompatible change is indicated by amajor number bump. Servers may implement multiple major versions and clients may require indicate that their message requires aparticular minimum minor version. :type version:str :param server:Clients can request that amessage be directed to aspecific server,rather than just one of apool of servers listening on the topic. :type server:str :param fanout:Clients may request that amessage be directed to all servers listening on atopicby setting fanout to ``True``,rather than just one of them. :type fanout:bool """ def __init__( self ,exchange = None ,topic = None ,namespace = None , version = None ,server = None ,fanout = None ): self .exchange = exchange self .topic = topic self .namespace = namespace self .version = version self .server = server self .fanout = fanout def __call__( self , * * kwargs): kwargs.setdefault( 'exchange' , self .exchange) kwargs.setdefault( 'topic' , self .topic) kwargs.setdefault( 'namespace' , self .namespace) kwargs.setdefault( 'version' , self .version) kwargs.setdefault( 'server' , self .server) kwargs.setdefault( 'fanout' , self .fanout) return Target( * * kwargs) def __eq__( self ,other): return vars ( self ) = = vars (other) def __ne__( self ,other): return not self = = other def __repr__( self ): attrs = [] for a in [ 'exchange' , 'topic' , 'namespace' , 'version' , 'server' , 'fanout' ]: v = getattr ( self ,a) if v: attrs.append((a,v)) values = ',' .join([ '%s=%s' % i for i in attrs]) return '<Target ' + values + '>' |
exchange:用于指定exchange,如果没有指定,那么就是配置文件里的control_exchange
topic:就是binding key。如果多个consumer(在注释里也叫server)监听同一个queue,那么会使用round-robin来发配消息
namespace:某些方法会用到这个,默认是空,具体作用我们后面如果遇到了再看
version:consumer(也就是server)是由版本号的,发送的message的版本要和consumer的版本兼容才行。版本号是major.mirror的形式。major号相同则兼容
server:特定的consumer。相当于direct的转发了,就是发送者指定特定的consumer,然后由它处理,而不是像topic里的解释那样,用round-robin来分发消息
fanout:表明消息会广播,忽略topic,只要consumer允许接收fanout消息即可
在我们的例子里,exchange使用默认的exchange(从
我们继续看下面的代码,这一行就开始获取一个client了,并且由于知道了target,所以我们也知道这个client的信息会发往何处。
1 | self .client = rpc.get_client(target,version_cap,serializer) |
1 2 3 4 5 6 7 | def get_client(target,version_cap = None ,serializer = None ): assert TRANSPORT is not None serializer = RequestContextSerializer(serializer) return messaging.RPCClient(TRANSPORT, target, version_cap = version_cap, serializer = serializer) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | class RPCClient( object ): """A class for invoking methods on remote servers. The RPCClient class is responsible for sendingmethod invocations to remote servers viaamessaging transport. A default targetis supplied to the RPCClient constructor,but target attributes can be overridden for individual method invocations using the prepare()method. A method invocation consists of arequest contextdictionary,amethod name and adictionary of arguments. A cast()invocation just sends the request and returns immediately. A call()invocation waits for the server to send areturn value. This class is intended to be used by wrapping it in another class which provides methods on the subclass to perform the remote invocation using call()or cast():: class TestClient(object): def __init__(self,transport): target= messaging.Target(topic='testtopic',version='2.0') self._client = messaging.RPCClient(transport,target) def test(self,ctxt,arg): return self._client.call(ctxt,'test',arg=arg) An example of using the prepare()method to override some attributes of the default target:: def test(self,ctxt,arg): cctxt = self._client.prepare(version='2.5') return cctxt.call(ctxt,'test',arg=arg) RPCClient have anumber of other properties - for example,timeoutand version_cap- which may make sense to override for some method invocations, so they too can be passed to prepare():: def test(self,ctxt,arg): cctxt = self._client.prepare(timeout=10) return cctxt.call(ctxt,'test',arg=arg) However,this class can be used directly without wrapping it another class. For example:: transport = messaging.get_transport(cfg.CONF) target= messaging.Target(topic='testtopic',version='2.0') client = messaging.RPCClient(transport,target) client.call(ctxt,'test',arg=arg) but this is probably only useful in limited circumstances as awrapper class will usually help to make the code much more obvious. """ def __init__( self ,transport,target, timeout = None ,version_cap = None ,serializer = None ): """Construct an RPC client. :param transport:amessaging transport handle :type transport:Transport :param target:the default targetfor invocations :type target:Target :param timeout:an optional default timeout(in seconds)for call()s :type timeout:int or float :param version_cap:raise aRPCVersionCapError version exceeds this cap :type version_cap:str :param serializer:an optional entity serializer :type serializer:Serializer """ self .conf = transport.conf self .conf.register_opts(_client_opts) self .transport = transport self .target = target self .timeout = timeout self .version_cap = version_cap self .serializer = serializer or msg_serializer.NoOpSerializer() super (RPCClient, self ).__init__() _marker = _CallContext._marker |
构造函数没什么可讲的,就是赋值。所以我们继续看下面的代码,看下怎么调用rpc的call方法吧(再复习一下,call是同步的,需要等结果返回)。我们挑一个看看:
1 2 3 | def get_fixed_ip_by_address( self ,ctxt,address): return self .client.call(ctxt, 'get_fixed_ip_by_address' , address = address) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | def call( self ,ctxt,method, * * kwargs): """Invoke amethod and wait for areply. Method arguments must either be primitive types or types supported by the client's serializer(if any). Similarly,the request contextmust be adict unless the client's serializersupports serializing another type. The semantics of how any errors raised by the remote RPC endpoint method are handled are quite subtle. Firstly,if the remote exception is contained in one of the modules listed in the allow_remote_exmods messaging.get_transport()parameter, then it this exception will be re-raised by call(). However,such locally re-raised remote exceptions are distinguishable from the same exception type raised locally because re-raised remote exceptions are modified such that their class name ends withthe '_Remote' suffix so you may do:: if ex.__class__.__name__.endswith('_Remote'): # Some special case for locally re-raised remote exceptions Secondly,if aremote exception is not from amodule listed in the allowed_remote_exmods list,then amessaging.RemoteError exception is raised withall details of the remote exception. :param ctxt:arequest contextdict :type ctxt:dict :param method:the method name :type method:str :param kwargs:adict of method arguments :type kwargs:dict :raises:MessagingTimeout,RemoteError """ return self .prepare().call(ctxt,method, * * kwargs) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | def prepare( self ,exchange = _marker,topic = _marker,namespace = _marker, version = _marker,server = _marker,fanout = _marker, timeout = _marker,version_cap = _marker): """Prepare amethod invocation context. Use this method to override client properties for an individual method invocation. For example:: def test(self,ctxt,arg): cctxt = self.prepare(version='2.5') return cctxt.call(ctxt,'test',arg=arg) :param exchange:see Target.exchange :type exchange:str :param topic:see Target.topic :type topic:str :param namespace:see Target.namespace :type namespace:str :param version:requirement the server must support,see Target.version :type version:str :param server:send to aspecific server,see Target.server :type server:str :param fanout:send to all servers on topic,see Target.fanout :type fanout:bool :param timeout:an optional default timeout(in seconds)for call()s :type timeout:int or float :param version_cap:raise aRPCVersionCapError version exceeds this cap :type version_cap:str """ return _CallContext._prepare( self , exchange,topic,namespace, version,server,fanout, timeout,version_cap) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | @classmethod def _prepare( cls ,base, exchange = _marker,topic = _marker,namespace = _marker, version = _marker,server = _marker,fanout = _marker, timeout = _marker,version_cap = _marker): """Prepare amethod invocation context. See RPCClient.prepare().""" kwargs = dict ( exchange = exchange, topic = topic, namespace = namespace, version = version, server = server, fanout = fanout) kwargs = dict ([(k,v) for k,v in kwargs.items() if v is not cls ._marker]) target = base.target( * * kwargs) if timeout is cls ._marker: timeout = base.timeout if version_cap is cls ._marker: version_cap = base.version_cap return _CallContext(base.transport,target, base.serializer, timeout,version_cap) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | class _CallContext( object ): _marker = object () def __init__( self ,transport,target,serializer, timeout = None ,version_cap = None ): self .conf = transport.conf self .transport = transport self .target = target self .serializer = serializer self .timeout = timeout self .version_cap = version_cap super (_CallContext, self ).__init__() |
会过去看call吧,call会的最终调用这个上下文的call方法,也就是下面的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | def call( self ,ctxt,method, * * kwargs): """Invoke amethod and wait for areply. See RPCClient.call().""" msg = self ._make_message(ctxt,method,kwargs) msg_ctxt = self .serializer.serialize_context(ctxt) timeout = self .timeout if self .timeout is None : timeout = self .conf.rpc_response_timeout if self .version_cap: self ._check_version_cap(msg.get( 'version' )) try : result = self .transport._send( self .target,msg_ctxt,msg, wait_for_reply = True ,timeout = timeout) except driver_base.TransportDriverError as ex: raise ClientSendError( self .target,ex) return self .serializer.deserialize_entity(ctxt,result) |
1 2 3 4 5 | try : result = self .transport._send( self .target,msg_ctxt,msg, wait_for_reply = True ,timeout = timeout) except driver_base.TransportDriverError as ex: raise ClientSendError( self .target,ex) |
1 2 3 4 5 6 7 | def _send( self ,target,ctxt,message,wait_for_reply = None ,timeout = None ): if not target.topic: raise exceptions.InvalidTarget( 'A topicis required to send' , target) return self ._driver.send(target,ctxt,message, wait_for_reply = wait_for_reply, timeout = timeout) |
1 | <oslo.messaging._drivers.impl_qpid.QpidDriver object at 0x2e38a90> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | class RabbitDriver(amqpdriver.AMQPDriverBase): def __init__( self ,conf,url,default_exchange = None , allowed_remote_exmods = []): conf.register_opts(rabbit_opts) conf.register_opts(rpc_amqp.amqp_opts) connection_pool = rpc_amqp.get_connection_pool(conf,Connection) super (RabbitDriver, self ).__init__(conf,url, connection_pool, default_exchange, allowed_remote_exmods) def require_features( self ,requeue = True ): pass |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | class AMQPDriverBase(base.BaseDriver): def __init__( self ,conf,url,connection_pool, default_exchange = None ,allowed_remote_exmods = []): super (AMQPDriverBase, self ).__init__(conf,url,default_exchange, allowed_remote_exmods) self ._server_params = self ._server_params_from_url( self ._url) self ._default_exchange = default_exchange # FIXME(markmc):temp hack if self ._default_exchange: self .conf.set_override( 'control_exchange' , self ._default_exchange) self ._connection_pool = connection_pool self ._reply_q_lock = threading.Lock() self ._reply_q = None self ._reply_q_conn = None self ._waiter = None |
1 2 | def send( self ,target,ctxt,message,wait_for_reply = None ,timeout = None ): return self ._send(target,ctxt,message,wait_for_reply,timeout) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | def _send( self ,target,ctxt,message, wait_for_reply = None ,timeout = None , envelope = True ,notify = False ): # FIXME(markmc):remove this temporary hack class Context( object ): def __init__( self ,d): self .d = d def to_dict( self ): return self .d context = Context(ctxt) msg = message if wait_for_reply: msg_id = uuid.uuid4(). hex msg.update({ '_msg_id' :msg_id}) LOG.debug( 'MSG_ID is %s' % (msg_id)) msg.update({ '_reply_q' : self ._get_reply_q()}) rpc_amqp._add_unique_id(msg) rpc_amqp.pack_context(msg,context) if envelope: msg = rpc_common.serialize_msg(msg) if wait_for_reply: self ._waiter.listen(msg_id) try : with self ._get_connection()as conn: if notify: conn.notify_send(target.topic,msg) elif target.fanout: conn.fanout_send(target.topic,msg) else : topic = target.topic if target.server: topic = '%s.%s' % (target.topic,target.server) conn.topic_send(topic,msg,timeout = timeout) if wait_for_reply: result = self ._waiter.wait(msg_id,timeout) if isinstance (result,Exception): raise result return result finally : if wait_for_reply: self ._waiter.unlisten(msg_id) |
懂了上面的这些后,就开始看代码吧:
首先设置msg_id,同时生成一个返回的queue(就是我们上面讲的那个接收返回消息的queue):
1 2 3 4 5 | if wait_for_reply: msg_id = uuid.uuid4(). hex msg.update({ '_msg_id' :msg_id}) LOG.debug( 'MSG_ID is %s' % (msg_id)) msg.update({ '_reply_q' : self ._get_reply_q()}) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | def _get_reply_q( self ): with self ._reply_q_lock: if self ._reply_q is not None : return self ._reply_q reply_q = 'reply_' + uuid.uuid4(). hex conn = self ._get_connection(pooled = False ) self ._waiter = ReplyWaiter( self .conf,reply_q,conn, self ._allowed_remote_exmods) self ._reply_q = reply_q self ._reply_q_conn = conn return self ._reply_q |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | class ReplyWaiter( object ): def __init__( self ,conf,reply_q,conn,allowed_remote_exmods): self .conf = conf self .conn = conn self .reply_q = reply_q self .allowed_remote_exmods = allowed_remote_exmods self .conn_lock = threading.Lock() self .incoming = [] self .msg_id_cache = rpc_amqp._MsgIdCache() self .waiters = ReplyWaiters() conn.declare_direct_consumer(reply_q, self ) |
1 2 3 4 5 6 | def declare_direct_consumer( self ,topic,callback): """Create a'direct' queue. In nova's use,this is generally amsg_idqueue used for responses for call/multicall """ self .declare_consumer(DirectConsumer,topic,callback) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | def declare_consumer( self ,consumer_cls,topic,callback): """Create aConsumer using the class that was passed in and add it to our list of consumers """ def _connect_error(exc): log_info = { 'topic' :topic, 'err_str' : str (exc)} LOG.error(_( "Failed to declare consumerfor topic'%(topic)s':" "%(err_str)s" ) % log_info) def _declare_consumer(): consumer = consumer_cls( self .conf, self .channel,topic,callback, six. next ( self .consumer_num)) self .consumers.append(consumer) return consumer return self .ensure(_connect_error,_declare_consumer) |
好啦,reply queue看好了,可以继续看_send方法了。现在我们知道的事情是:消息往exchang=nova的exchange发,并且topic=network。同时我建立了一个叫做reply_XXX的队列,用于接收返回值。继续看吧:
1 2 | if wait_for_reply: self ._waiter.listen(msg_id) |
接下来就是发送消息了:
1 2 3 4 5 6 7 8 9 10 11 | try : with self ._get_connection()as conn: if notify: conn.notify_send(target.topic,msg) elif target.fanout: conn.fanout_send(target.topic,msg) else : topic = target.topic if target.server: topic = '%s.%s' % (target.topic,target.server) conn.topic_send(topic,msg,timeout = timeout) |
最后几行就很简单了:
1 2 3 4 5 6 7 8 | if wait_for_reply: result = self ._waiter.wait(msg_id,timeout) if isinstance (result,Exception): raise result return result finally : if wait_for_reply: self ._waiter.unlisten(msg_id) |
最后,我们再来看看这里rpc的callback方法吧。我们知道我们调用了call后,就会在reply queue上监听,那么当消息到达后会如何处理呢?我们来看下:通过result= self._waiter.wait(msg_id,timeout),我们可以得到result,所以wait是我们的切入点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | def wait( self ,msg_id,timeout): # # NOTE(markmc):we're waiting for areply for msg_idto come in for on # the reply_q,but there may be other threads also waiting for replies # to other msg_ids # # Only one thread can be consuming from the queue using this connection # and we don't want to hold open aconnection per thread,so instead we # have the first thread take responsibility for passing replies not # intended for itself to the appropriate thread. # final_reply = None while True : if self .conn_lock.acquire( False ): # Ok,we're the thread responsible for polling the connection try : # Check the queue to see if aprevious lock-holding thread # queued up areply already while True : reply,ending,empty = self ._check_queue(msg_id) if empty: break if not ending: final_reply = reply else : return final_reply # Now actually poll the connection while True : reply,ending = self ._poll_connection(msg_id,timeout) if not ending: final_reply = reply else : return final_reply finally : self .conn_lock.release() # We've got our reply,tell the other threads to wake up # so that one of them will take over the responsibility for # polling the connection self .waiters.wake_all(msg_id) else : # We're going to wait for the first thread to pass us our reply reply,ending,trylock = self ._poll_queue(msg_id,timeout) if trylock: # The first thread got its reply,let's try and take over # the responsibility for polling continue if not ending: final_reply = reply else : return final_reply |
1 | reply,ending = self ._poll_connection(msg_id,timeout) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | def _poll_connection( self ,msg_id,timeout): while True : while self .incoming: message_data = self .incoming.pop( 0 ) incoming_msg_id = message_data.pop( '_msg_id' , None ) if incoming_msg_id = = msg_id: return self ._process_reply(message_data) self .waiters.put(incoming_msg_id,message_data) try : self .conn.consume(limit = 1 ,timeout = timeout) except rpc_common.Timeout: raise messaging.MessagingTimeout( 'Timed out waiting for a' 'reply to message ID %s' % msg_id) |
1 2 3 4 5 6 7 8 9 10 11 12 13 | def _process_reply( self ,data): result = None ending = False self .msg_id_cache.check_duplicate_message(data) if data[ 'failure' ]: failure = data[ 'failure' ] result = rpc_common.deserialize_remote_exception( failure, self .allowed_remote_exmods) elif data.get( 'ending' , False ): ending = True else : result = data[ 'result' ] return result,ending |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | class ReplyWaiter( object ): def __init__( self ,conf,reply_q,conn,allowed_remote_exmods): self .conf = conf self .conn = conn self .reply_q = reply_q self .allowed_remote_exmods = allowed_remote_exmods self .conn_lock = threading.Lock() self .incoming = [] self .msg_id_cache = rpc_amqp._MsgIdCache() self .waiters = ReplyWaiters() conn.declare_direct_consumer(reply_q, self ) |
1 2 3 | def __call__( self ,message): message.acknowledge() self .incoming.append(message) |
4.小结一下rpc的call方法
大部分服务,如果其会的监听消息(也就是扮演rpc的接收端角色),那么一般都会提供一个rpcapi.py的文件,里边有一个XXXAPI的类(如ConsoleAuthAPI,NetworkAPI),这个类代表着一个RPCClient,并且初始化的时候会的把对应的topic赋值给它(ruconsole,network)。这个RPCClient有call和cast方法,调用这个RPCClient的call方法就会往对应的topic发送信息。其它服务如果要和某个服务rpc通信,那么只需要建立一个XXXAPI的对象即可。
当第一次使用call的时候,由于需要得到返回数据,因此会的建立一个reply_XXX的exchange,还会的建立一个reply_XXX的queue,其topic就是reply_XXX。注意,只有第一次调用call的时候会建立这些东西,此后再次调用call都不会建立了,而是复用这些exchange,queue,通过msg_id来区分接收到的消息是哪个线程发送的。
RPCClient最终是调用AMQPDriverBase中的_send方法去发送消息(RPCClient拥有TRANSPORT这个属性,后者拥有AMQPDriverBase这个属性,所以一个NetworkAPI含有一个AMQPDriverBase对象),具体的发送消息的动作由TRANSPORT中的_driver实现(比如RabbitMQ、QPID)。由于RPCClient只有一个(因为XXXAPI的类只有一个),所以reply_XXX这个queue就是AMQPDriverBase的一个属性。
AMQPDriverBase有一个属性是ReplyWaiter(含义是reply的等待者,这个等待者是针对RPCClient这个大的东西来说的,类似于manager),后者有一个属性是ReplyWaiters(也是reply的等待者,但这个等待的含义只的是等待的message,或者说一个ReplyWaiters就是一个调用了call方法并在等待reply的线程的集合)。
reply_XXX队列上的Consumer是DirectConsumer,其收到消息后会的把消息放在ReplyWaiter的incoming列表中,同时发送消息的ack确认。
接收消息这个动作是在call方法调用_send后,_send调用ReplyWaiter的wait来实现的。
wait的实现很简单,首先获取一个线程锁,确保某一个时候只有一个线程可以访问reply_XXX。
如果某个线程获取了这个锁:那么其会的查看ReplyWaiter的incoming列表,在里边取出消息,然后比较消息的msg_id是不是我这个线程需要的msg_id,如果不是,那么就把这个消息放到ReplyWaiters的一个字典中,这个字典的key是msg_id,value就是消息。如果取完了incoming中的消息发现都没有这个线程需要的,那么这个这个线程就会调用DirectConsumer的consume方法,参数是(limit=1,timeout=timeout),含义是等待最多timeout秒,并且只取一个消息。如果超时,那么就报错。如果取到了,那么和上面一样判断是不是自己需要的msg,是的话就去做处理,不是的话就放到ReplyWaiters的一个字典中,让别的没有取得线程锁的线程处理。
如果某个线程没有获取这个锁,那么其会的查看ReplyWaiters的字典,看看自己需要的消息是不是已经有人从incoming中取出来并放到这里了。如果是那么就处理,如果不是那么就等一会然后再去尝试获取锁,做类似的操作。
对于消息的处理其实很简单,如果有值那么就返回,如果有错那么就报错。
比如我现在有三个线程,都调用了call方法往network组件发送了一个消息,那么这三个线程都是共用一个NetworkAPI对象的(因为线程的建立在这个对象初始化之后才调用),这个NetworkAPI对象就是一个RPCClient,后者有TRANSPORT对象,知道消息发往的目的地是它的TARGET对象,同时TRANSPORT对象有AMQPDriverBase这个对象,后者有ReplyWaiter对象,ReplyWaiter有ReplyWaiters对象。
这三个线程都调用了call方法,只有第一个call方法会的创建一个reply_XXX exchange和一个topic是reply_XXX的queue,并且这三个线程(或者说这个RPCClient上后续所有的RPC调用)都会的使用这个reply_XXX exchange和reply_XXX queue。这三个线程发送的消息分别是msg_01,msg_02,msg_03(所以reply的消息id也应该是msg_01,msg_02,msg_03)。当他们发送完后,都会调用wait方法。由于只有一个线程可以取到锁,所以这三个线程中有两个线程会去ReplyWaiters的字典中看看那个取到锁的线程有没有把消息取来放到这个字典里。而那个取到锁的线程则是去看incoming列表,从中取消息。如果取到的是自己要的消息(比如msg_01),那么就释放锁并处理消息后返回消息中的返回值。如果不是自己需要的消息那么这个线程会把这个消息(比如msg_02,并不是它想要的msg_01)放到ReplyWaiters的字典中(添加一个条目:msg_01:MSG_DATA),继续从incoming中取消息。如果incoming中已经没有消息了,那么就调用reply_XXX上的consumer的consume方法,从reply_XXX中取消息放到incoming中,然后再去做相同的操作。
。
几个对象的关系(不是UML。。。瞎画的。。。):
相关文章推荐
- 研究openstack中libguestfs密码不能注入到lvm分区镜像中问题
- OpenStack Instance HA Proposal
- openstack ice版availability zones host aggregates 实战详解
- Python 代码风格 和 PEP8
- 为什么选择with 而不是 finally(转)
- 关于python中的setup.py(转)
- python用profile、hotshot、timeit协助程序性能优化
- Python两个内置函数——locals 和globals(转)
- Python yield语法 使用实战详解
- Python单例模式的4种实现方法(转)
- Python性能分析 (Profiling)
- nova中增加数据库表isolated_schedule-openstack-ice
- 资源隔离调度算法测试(isolated_scheduler)及openstack集群开发环境搭建
- nova-scheduler详解 openstack-ice版
- SQLAlchemy实战详解
- 5个简单方式使用strace场景
- qemu-img命令详解
- qemu中KVM硬件虚拟化的初始化分析 (tcg、xen、kvm、qtest)(转)
- openstack nova调用libvirt,跟踪libvirt源码实例详解(cpu_mode及live_migrate 错误解决)
- 采用eclipse + gdb来搭建调试qemu源码的环境