您的位置:首页 > 运维架构

openstack的RPC消息通信,oslo.messagin实现

2017-10-21 20:21 537 查看
oslo.messaging库就是把rabbitmq的python库做了封装,考虑到了编程友好、性能、可靠性、异常的捕获等诸多因素。让各个项目的开发者聚焦于业务代码的编写,而不用考虑消息如何发送和接收。

openstack的RPC消息通信,提供了同步call和异步cast的两种调用方法,对每一组消息通信建立一个消息通道。

oslo.messaging是应用于python的库,为微服务直接或者server和client直接建立消息通信网络。

环境搭建:

1.linux操作系统运行环境

2.安装pip,可以使用源码安装,地址为https://pypi.python.org/pypi/pip/,解压后进入文件目录执行python setup.py install即可

3.安装pbr,这个组件是oslo库的依赖,可以使用命令pip install pbr,在联网环境下即可安装

4.安装oslo_config库,选择命令安装pip install oslo_config,或者下载oslo.config源码安装地址https://pypi.python.org/pypi/oslo.config/1.9.3,解压后进入目录执行python setup.py install即可

5.安装oslo_messaging库,使用pip install oslo_messaging,或者下载oslo.messaging源码进行安装,下载地址https://pypi.python.org/pypi/oslo.messaging,解压后进入目录执行python
setup.py install即可

下面就可以使用oslo库了,下面分享一下client和server的样例代码:

client.py

from oslo_config import cfg

import oslo_messaging as messaging

transport= messaging.get_transport(cfg.CONF)

target= messaging.Target(topic='my_test_topic')

client= messaging.RPCClient(transport, target)

ret= client.call(ctxt = {},

                 method = 'test',

                 arg = 'myarg')

cctxt= client.prepare(namespace='control', version='2.0')

cctxt.cast({},'stop')

server.py

from oslo_config import cfg

import oslo_messaging

import time

classServerControlEndpoint(object):

 

    target =oslo_messaging.Target(namespace='control',

                                  version='2.0')

    def __init__(self, server):

        self.server = server

    def stop(self, ctx):

        print“------ServerControlEndpoint. stop --------”

        if self.server:

            self.server.stop()

 

classTestEndpoint(object):

    def test(self, ctx, arg):

        print“------ TestEndpoint.test --------”

        return arg

transport= oslo_messaging.get_transport(cfg.CONF)

target= oslo_messaging.Target(topic='my_test_topic', server='server1')

endpoints= [

    ServerControlEndpoint(None),

    TestEndpoint(),

]

server= oslo_messaging.get_rpc_server(transport, target, endpoints,

                                      executor='blocking')

try:

    server.start()

    while True:

        time.sleep(1)

exceptKeyboardInterrupt:

   print("Stopping server")

 

server.stop()

server.wait()

最后执行python server.py使得服务端处于监听状态,执行python client.py发送RPC请求消息

注1:

#构造Target时有以下选择,oslo_messaging.Target(argument[i]):

#构造RPC_Server 的Target:

#topic and server is required; exchange is optional

#构造RPC_endpoint的Target:

#namespace and version optional

#构造RPC_client sending a message:

#topic is required, all other attributes optional

#构造Notification_Server的Target:

#topic is required, exchange is optional; all other attributes ignored

#构造Notifier的Target:

#topic is required, exchange is optional; all other attributes ignored

注2:

transport= oslo_messaging.get_transport(cfg.CONF)

#get_transport()中的cfg.CONF,是从cfg对象中,读取transport_url,rpc_backend和control_exchange信息构造Transport对象,其中rpc_backend和control_exchange的默认值分别为:rabbit和openstack。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐