openstack的公共库(oslo)的使用
2016-09-14 14:50
579 查看
为了降低代码冗余度,openstack社区开发了很多公共库。通过这些公共库,可以很容易弄出一个完善鉴权、分布式、易配置、带调用链日志的REST服务。
oslo库的缺点是需要的背景知识比较多,英文文档写的又很简单,要真正用起来,没有几个demo会寸步难行。本文的目的就是通过demo,降低大家使用oslo库的难度。
1. oslo常用组件的一览表
2. 配置文件 oslo.config
它把配置项直接融入你的代码内,例子如下:
app.conf
[python]
view plain
copy
[DEFAULT]
username=app
[rabbit]
host = 192.168.1.7
port = 5672
myconfig.py
[python]
view plain
copy
# -*- coding: utf-8 -*-
import sys
from oslo_config import cfg
#默认组的配置项
service_opts = [
cfg.StrOpt('username',
default='default',
help='user name'),
cfg.StrOpt('password',
help='password')
]
#自定义配置组
rabbit_group = cfg.OptGroup(
name='rabbit',
title='RabbitMQ options'
)
# 配置组中的多配置项模式
rabbit_Opts = [
cfg.StrOpt('host',
default='localhost',
help='IP/hostname to listen on.'),
cfg.IntOpt('port',
default=5672,
help='Port number to listen on.')
]
CONF = cfg.CONF
#注册默认组的配置项
CONF.register_opts(service_opts)
#配置组必须在其组件被注册前注册!
CONF.register_group(rabbit_group)
#注册配置组中含有多个配置项的模式,必须指明配置组
CONF.register_opts(rabbit_Opts, rabbit_group)
#设置默认的日志文件名
CONF(sys.argv[1:], default_config_files=['app.conf'])
#使用配置项
print ("username=%s rabbitmq.host=%s " % (CONF.username, CONF.rabbit.host))
3. 工具库 oslo.utils
4. REST服务器 oslo.service
oslo.service比较负责,因为它透传了很多wsgi的参数,这些其实是开发者不希望直接看到的。下面的例子在oslo.service的基础上再封装了一个小的MiniService,这样用起来会比较方便。
[python]
view plain
copy
# -*- coding: utf-8 -*-
import sys
from webob import Request
#引入配置文件
from oslo_config import cfg
#引入带调用链的日志
from oslo_log import log as logging
from oslo_context import context
#引入REST服务
from oslo_service import service
from oslo_service import wsgi
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, "m19k")
#mini服务
class MiniService:
def __init__(self, host = "0.0.0.0", port = "9000", workers = 1, use_ssl = False, cert_file = None, ca_file = None):
self.host = host
self.port = port
self.workers = workers
self.use_ssl = use_ssl
self.cert_file = cert_file
self.ca_file = ca_file
self._actions = {}
def add_action(self, url_path, action):
if (url_path.lower() == "default") or (url_path == "/") or (url_path == ""):
url_path = "default"
elif (not url_path.startswith("/")):
url_path = "/" + url_path
self._actions[url_path] = action
def _app(self, environ, start_response):
context.RequestContext()
LOG.debug("start action.")
request = Request(environ)
action = self._actions.get(environ['PATH_INFO'])
if action == None:
action = self._actions.get("default")
if action != None:
result = action(environ, request.method, request.path_info, request.query_string, request.body)
try:
result[1]
except Exception,e:
result = ('200 OK', str(result))
start_response(result[0], [('Content-Type', 'text/plain')])
return result[1]
start_response("200 OK",[('Content-type', 'text/html')])
return "mini service is ok\n"
def start(self):
self.server = wsgi.Server(CONF,
"m19k",
self._app,
host = self.host,
port = self.port,
use_ssl = self.use_ssl)
launcher = service.ProcessLauncher(CONF)
launcher.launch_service(self.server, workers = self.workers)
LOG.debug("launch service (%s:%s)." % (self.host, self.port))
launcher.wait()
使用上述miniserver即可创建一个REST服务器,代码如下
[python]
view plain
copy
# -*- coding: utf-8 -*-
import sys
from oslo_config import cfg
from oslo_log import log as logging
import miniservice
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def default_action(env, method, path, query, body):
LOG.info("demo action (method:%s, path:%s, query:%s, body:%s)"
% (method, path, query, body))
return ("200 OK", "default")
def test_action(env, method, path, query, body):
LOG.info("test (method:%s, path:%s, query:%s, body:%s)"
% (method, path, query, body))
return ("200 OK", "test")
if __name__ == "__main__":
CONF(sys.argv[1:])
host = getattr(CONF, "host", "0.0.0.0")
port = getattr(CONF, "port", "8001")
service = miniservice.MiniService(host, port)
service.add_action("", default_action)
service.add_action("test", test_action)
service.start()
通过curl即可测试
[python]
view plain
copy
curl http://localhost:8001/test -H "content-type:application/json" -X POST -d "{'a':'b', 'c':'1'}"
当然还可以通过自定义的python的httpclient,代码如下:
[python]
view plain
copy
# -*- coding: utf-8 -*-
import uuid
import socket
import functools
import requests
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
client_opts = [
cfg.BoolOpt('debug',
default=False,
help="Print log in every request"),
]
CONF = cfg.CONF
CONF.register_opts(client_opts)
LOG = logging.getLogger(__name__)
class HttpClient(object):
def __init__(self, cert=None, timeout=None, session=None):
self.cert = cert
self.timeout = None
if not session:
session = requests.Session()
# Use TCPKeepAliveAdapter to fix bug 1323862
for scheme in list(session.adapters):
session.mount(scheme, TCPKeepAliveAdapter())
self.session = session
def request(self, url, method, json=None, connect_retries=0, **kwargs):
#设置Http头,一般用于存储认证信息和格式信息
headers = kwargs.setdefault('headers', dict())
if self.cert:
kwargs.setdefault('cert', self.cert)
if self.timeout is not None:
kwargs.setdefault('timeout', self.timeout)
user_agent = headers.setdefault('User-Agent', uuid.uuid4().hex)
if json is not None:
headers['Content-Type'] = 'application/json'
kwargs['data'] = jsonutils.dumps(json)
#设置重试
send = functools.partial(self._send_request, url, method, connect_retries)
#获取response
resp = send(**kwargs)
return resp
def _send_request(self, url, method, connect_retries, connect_retry_delay=0.5, **kwargs):
try:
if CONF.debug:
LOG.debug("REQ:{url:%s, method:%s}" % (url, method))
resp = self.session.request(method, url, **kwargs)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if connect_retries <= 0:
raise
time.sleep(connect_retry_delay)
return self._send_request(
url, method, connect_retries=connect_retries - 1,
connect_retry_delay=connect_retry_delay * 2,
**kwargs)
if CONF.debug:
LOG.debug("RESP:{url:%s, method:%s, status:%s}" % (url, method, resp.status_code))
return resp
def head(self, url, **kwargs):
return self.request(url, 'HEAD', **kwargs)
def get(self, url, **kwargs):
return self.request(url, 'GET', **kwargs)
def post(self, url, **kwargs):
return self.request(url, 'POST', **kwargs)
def put(self, url, **kwargs):
return self.request(url, 'PUT', **kwargs)
def delete(self, url, **kwargs):
return self.request(url, 'DELETE', **kwargs)
def patch(self, url, **kwargs):
return self.request(url, 'PATCH', **kwargs)
#用于解决TCP Keep-Alive的补丁
class TCPKeepAliveAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
if 'socket_options' not in kwargs:
socket_options = [
# Keep Nagle's algorithm off
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
# Turn on TCP Keep-Alive
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
if hasattr(socket, 'TCP_KEEPIDLE'):
socket_options += [
# Wait 60 seconds before sending keep-alive probes
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
]
if hasattr(socket, 'TCP_KEEPCNT'):
socket_options += [
# Set the maximum number of keep-alive probes
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
]
if hasattr(socket, 'TCP_KEEPINTVL'):
socket_options += [
# Send keep-alive probes every 15 seconds
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
]
kwargs['socket_options'] = socket_options
super(TCPKeepAliveAdapter, self).init_poolmanager(*args, **kwargs)
httpclient = HttpClient()
print httpclient.request("http://localhost:8001/test", "POST", "{'a':'b'}")
5. 日志和调用链 oslo.log + oslo.context
纯粹的oslo.log是很容易使用的,参见下面的例子:
[python]
view plain
copy
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
DOMAIN = "demo"
logging.register_options(CONF)
logging.setup(CONF, DOMAIN)
# Oslo Logging uses INFO as default
LOG.info("Oslo Logging")
LOG.warning("Oslo Logging")
LOG.error("Oslo Logging")
而oslo.context(所谓的调用链),指的是每个Rest请求里面,在打印日志的时候都会带一个不变的request_id,由此可以分离出单次操作的日志。
在上述miniservice中,在REST的入口处,通过
[python]
view plain
copy
context.RequestContext()
即生成了这样的request_id,之后每次log都会自动带上它。
6. RPC调用 oslo.messaging
一个服务对外是REST接口,而服务内部的多个组件走的是RPC。Openstack中,RPC一般用rabbitmq来实现,oslo.messaging就是封装它的。可惜的是,它也要让读者有amqp的背景知识。
server.py
[python]
view plain
copy
from oslo_config import cfg
import oslo_messaging
from oslo_log import log as logging
import time
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, "myservice")
CONF(default_config_files=['app.conf'])
class ServerControlEndpoint(object):
target = oslo_messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if self.server:
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
print "test"
print arg
return arg
transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test123', server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='blocking')
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
client.py
[python]
view plain
copy
import oslo_messaging as messaging
from oslo_context import context
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, "myservice")
CONF(default_config_files=['app.conf'])
ctxt = {}
arg = {'a':'b'}
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test123')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
oslo库的缺点是需要的背景知识比较多,英文文档写的又很简单,要真正用起来,没有几个demo会寸步难行。本文的目的就是通过demo,降低大家使用oslo库的难度。
1. oslo常用组件的一览表
库名 | 作用 | 背景知识 |
oslo.config | 配置文件 | 无 |
oslo.utils | 工具库 | 无 |
oslo.service | 带ssl的REST服务器 | wsgi |
oslo.log + oslo.context | 带调用链的日志系统 | 无 |
oslo.messaging | RPC调用 | amqp |
oslo.db | 数据库 | sqlalchemy |
oslo.rootwrap | Linux的sudo | 无 |
oslo.serialization | 序列化 | 无 |
oslo.i18n | 国际化 | 无 |
oslo.policy | 权限系统 | deploy paste |
oslo.middleware | pipeline | deploy paste |
keystonemiddleware | 用户系统 | deploy paste + keystone |
oslo_test | 测试 | unittest |
它把配置项直接融入你的代码内,例子如下:
app.conf
[python]
view plain
copy
[DEFAULT]
username=app
[rabbit]
host = 192.168.1.7
port = 5672
myconfig.py
[python]
view plain
copy
# -*- coding: utf-8 -*-
import sys
from oslo_config import cfg
#默认组的配置项
service_opts = [
cfg.StrOpt('username',
default='default',
help='user name'),
cfg.StrOpt('password',
help='password')
]
#自定义配置组
rabbit_group = cfg.OptGroup(
name='rabbit',
title='RabbitMQ options'
)
# 配置组中的多配置项模式
rabbit_Opts = [
cfg.StrOpt('host',
default='localhost',
help='IP/hostname to listen on.'),
cfg.IntOpt('port',
default=5672,
help='Port number to listen on.')
]
CONF = cfg.CONF
#注册默认组的配置项
CONF.register_opts(service_opts)
#配置组必须在其组件被注册前注册!
CONF.register_group(rabbit_group)
#注册配置组中含有多个配置项的模式,必须指明配置组
CONF.register_opts(rabbit_Opts, rabbit_group)
#设置默认的日志文件名
CONF(sys.argv[1:], default_config_files=['app.conf'])
#使用配置项
print ("username=%s rabbitmq.host=%s " % (CONF.username, CONF.rabbit.host))
3. 工具库 oslo.utils
函数名 | 作用 |
oslo_utils.encodeutils.exception_to_unicode(exc) | 异常消息转unicode |
oslo_utils.encodeutils.safe_decode(text, incoming=None, errors='strict') | 其他编码转unicode |
oslo_utils.encodeutils.safe_encode(text, incoming=None, encoding='utf-8', errors='strict') | unicode转其他编码,默认utf-8 |
oslo_utils.encodeutils.to_utf8(text) | unicode转utf-8 |
oslo_utils.eventletutils.fetch_current_thread_functor() | 获取当前线程的结构体 |
oslo_utils.fileutils.delete_if_exists(path) | 删除文件 |
oslo_utils.fileutils.ensure_tree(path, mode=511) | 创建文件夹 |
oslo_utils.fileutils.remove_path_on_error(path) | 删除文件夹 |
oslo_utils.fileutils.write_to_tempfile(content, path=None, suffix='', prefix='tmp') | 写入临时文件 |
oslo_utils.importutils.import_any(module, *modules) | 动态导入一个python包 |
oslo_utils.importutils.import_class(import_str) | 动态导入一个python类 |
oslo_utils.importutils.import_object(import_str, *args, **kwargs) | 动态导入一个python对象 |
oslo_utils.importutils.try_import(import_str, default=None) | 尝试导入一个包,失败了用default |
oslo_utils.netutils.get_my_ipv4() | 获取本地的ipv4地址 |
oslo_utils.netutils.is_ipv6_enabled() | 查看本地网络是否允许ipv6 |
oslo_utils.netutils.is_valid_cidr(address) | 判断一个地址是否合法 |
oslo_utils.netutils.is_valid_ip(address) | 判断ip是否合法 |
oslo_utils.netutils.is_valid_ipv4(address) | 判断是否是合法的ipv4地址 |
oslo_utils.netutils.is_valid_ipv6(address) | 判断是否是合法的ipv6地址 |
oslo_utils.netutils.urlsplit(url, scheme='', allow_fragments=True) | 类似urlparse.urlsplit(),切分url |
oslo_utils.reflection.accepts_kwargs(function) | 查看函数是否接受kwargs类似的参数 |
oslo_utils.reflection.get_class_name(obj, fully_qualified=True) | 获取对象的类名 |
oslo_utils.reflection.get_all_class_names(obj, up_to=<type 'object'>) | 获取父类名字 |
oslo_utils.reflection.get_callable_args(function, required_only=False) | 获取函数能传的参数 |
oslo_utils.reflection.get_member_names(obj, exclude_hidden=True) | 获取对象的属性名 |
oslo_utils.reflection.get_members(obj, exclude_hidden=True) | 获取对象的属性 |
oslo_utils.reflection.get_method_self(method) | 获取函数的self |
oslo_utils.reflection.is_subclass(obj, cls) | obj是否是cls的子类 |
oslo_utils.strutils.bool_from_string(subject, strict=False, default=False) | str转bool |
oslo_utils.strutils.check_string_length(value, name=None, min_length=0, max_length=None) | 检查字符串长度 |
oslo_utils.strutils.int_from_bool_as_string(subject) | bool转int |
oslo_utils.strutils.is_int_like(val) | 检查是否是数字 |
oslo_utils.strutils.mask_dict_password(dictionary, secret='***') | 将字符串中的password替换掉 |
oslo_utils.strutils.mask_password(message, secret='***') | 将字符串中的password替换掉 |
oslo_utils.strutils.string_to_bytes(text, unit_system='IEC', return_int=False) | str转bytes |
oslo_utils.timeutils.delta_seconds(before, after) | 计算时间差 |
oslo_utils.timeutils.is_newer_than(after, seconds) | 比较时间 |
oslo_utils.timeutils.isotime(at=None, subsecond=False) | 时间转iso格式 |
oslo_utils.timeutils.parse_strtime(timestr, fmt='%Y-%m-%dT%H:%M:%S.%f') | 字符串转时间 |
oslo_utils.timeutils.strtime(at=None, fmt='%Y-%m-%dT%H:%M:%S.%f') | 时间转字符串 |
oslo_utils.timeutils.utcnow(with_timezone=False) | 获取当前时间 |
oslo_utils.uuidutils.generate_uuid() | 产生一个uuid |
oslo_utils.uuidutils.is_uuid_like(val) | 检查字符串是否是uuid |
oslo_utils.versionutils.convert_version_to_int(version) | version转int |
oslo_utils.versionutils.convert_version_to_str(version_int) | version转字符串 |
oslo.service比较负责,因为它透传了很多wsgi的参数,这些其实是开发者不希望直接看到的。下面的例子在oslo.service的基础上再封装了一个小的MiniService,这样用起来会比较方便。
[python]
view plain
copy
# -*- coding: utf-8 -*-
import sys
from webob import Request
#引入配置文件
from oslo_config import cfg
#引入带调用链的日志
from oslo_log import log as logging
from oslo_context import context
#引入REST服务
from oslo_service import service
from oslo_service import wsgi
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, "m19k")
#mini服务
class MiniService:
def __init__(self, host = "0.0.0.0", port = "9000", workers = 1, use_ssl = False, cert_file = None, ca_file = None):
self.host = host
self.port = port
self.workers = workers
self.use_ssl = use_ssl
self.cert_file = cert_file
self.ca_file = ca_file
self._actions = {}
def add_action(self, url_path, action):
if (url_path.lower() == "default") or (url_path == "/") or (url_path == ""):
url_path = "default"
elif (not url_path.startswith("/")):
url_path = "/" + url_path
self._actions[url_path] = action
def _app(self, environ, start_response):
context.RequestContext()
LOG.debug("start action.")
request = Request(environ)
action = self._actions.get(environ['PATH_INFO'])
if action == None:
action = self._actions.get("default")
if action != None:
result = action(environ, request.method, request.path_info, request.query_string, request.body)
try:
result[1]
except Exception,e:
result = ('200 OK', str(result))
start_response(result[0], [('Content-Type', 'text/plain')])
return result[1]
start_response("200 OK",[('Content-type', 'text/html')])
return "mini service is ok\n"
def start(self):
self.server = wsgi.Server(CONF,
"m19k",
self._app,
host = self.host,
port = self.port,
use_ssl = self.use_ssl)
launcher = service.ProcessLauncher(CONF)
launcher.launch_service(self.server, workers = self.workers)
LOG.debug("launch service (%s:%s)." % (self.host, self.port))
launcher.wait()
使用上述miniserver即可创建一个REST服务器,代码如下
[python]
view plain
copy
# -*- coding: utf-8 -*-
import sys
from oslo_config import cfg
from oslo_log import log as logging
import miniservice
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def default_action(env, method, path, query, body):
LOG.info("demo action (method:%s, path:%s, query:%s, body:%s)"
% (method, path, query, body))
return ("200 OK", "default")
def test_action(env, method, path, query, body):
LOG.info("test (method:%s, path:%s, query:%s, body:%s)"
% (method, path, query, body))
return ("200 OK", "test")
if __name__ == "__main__":
CONF(sys.argv[1:])
host = getattr(CONF, "host", "0.0.0.0")
port = getattr(CONF, "port", "8001")
service = miniservice.MiniService(host, port)
service.add_action("", default_action)
service.add_action("test", test_action)
service.start()
通过curl即可测试
[python]
view plain
copy
curl http://localhost:8001/test -H "content-type:application/json" -X POST -d "{'a':'b', 'c':'1'}"
当然还可以通过自定义的python的httpclient,代码如下:
[python]
view plain
copy
# -*- coding: utf-8 -*-
import uuid
import socket
import functools
import requests
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
client_opts = [
cfg.BoolOpt('debug',
default=False,
help="Print log in every request"),
]
CONF = cfg.CONF
CONF.register_opts(client_opts)
LOG = logging.getLogger(__name__)
class HttpClient(object):
def __init__(self, cert=None, timeout=None, session=None):
self.cert = cert
self.timeout = None
if not session:
session = requests.Session()
# Use TCPKeepAliveAdapter to fix bug 1323862
for scheme in list(session.adapters):
session.mount(scheme, TCPKeepAliveAdapter())
self.session = session
def request(self, url, method, json=None, connect_retries=0, **kwargs):
#设置Http头,一般用于存储认证信息和格式信息
headers = kwargs.setdefault('headers', dict())
if self.cert:
kwargs.setdefault('cert', self.cert)
if self.timeout is not None:
kwargs.setdefault('timeout', self.timeout)
user_agent = headers.setdefault('User-Agent', uuid.uuid4().hex)
if json is not None:
headers['Content-Type'] = 'application/json'
kwargs['data'] = jsonutils.dumps(json)
#设置重试
send = functools.partial(self._send_request, url, method, connect_retries)
#获取response
resp = send(**kwargs)
return resp
def _send_request(self, url, method, connect_retries, connect_retry_delay=0.5, **kwargs):
try:
if CONF.debug:
LOG.debug("REQ:{url:%s, method:%s}" % (url, method))
resp = self.session.request(method, url, **kwargs)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if connect_retries <= 0:
raise
time.sleep(connect_retry_delay)
return self._send_request(
url, method, connect_retries=connect_retries - 1,
connect_retry_delay=connect_retry_delay * 2,
**kwargs)
if CONF.debug:
LOG.debug("RESP:{url:%s, method:%s, status:%s}" % (url, method, resp.status_code))
return resp
def head(self, url, **kwargs):
return self.request(url, 'HEAD', **kwargs)
def get(self, url, **kwargs):
return self.request(url, 'GET', **kwargs)
def post(self, url, **kwargs):
return self.request(url, 'POST', **kwargs)
def put(self, url, **kwargs):
return self.request(url, 'PUT', **kwargs)
def delete(self, url, **kwargs):
return self.request(url, 'DELETE', **kwargs)
def patch(self, url, **kwargs):
return self.request(url, 'PATCH', **kwargs)
#用于解决TCP Keep-Alive的补丁
class TCPKeepAliveAdapter(requests.adapters.HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
if 'socket_options' not in kwargs:
socket_options = [
# Keep Nagle's algorithm off
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
# Turn on TCP Keep-Alive
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
if hasattr(socket, 'TCP_KEEPIDLE'):
socket_options += [
# Wait 60 seconds before sending keep-alive probes
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
]
if hasattr(socket, 'TCP_KEEPCNT'):
socket_options += [
# Set the maximum number of keep-alive probes
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
]
if hasattr(socket, 'TCP_KEEPINTVL'):
socket_options += [
# Send keep-alive probes every 15 seconds
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
]
kwargs['socket_options'] = socket_options
super(TCPKeepAliveAdapter, self).init_poolmanager(*args, **kwargs)
httpclient = HttpClient()
print httpclient.request("http://localhost:8001/test", "POST", "{'a':'b'}")
5. 日志和调用链 oslo.log + oslo.context
纯粹的oslo.log是很容易使用的,参见下面的例子:
[python]
view plain
copy
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
DOMAIN = "demo"
logging.register_options(CONF)
logging.setup(CONF, DOMAIN)
# Oslo Logging uses INFO as default
LOG.info("Oslo Logging")
LOG.warning("Oslo Logging")
LOG.error("Oslo Logging")
而oslo.context(所谓的调用链),指的是每个Rest请求里面,在打印日志的时候都会带一个不变的request_id,由此可以分离出单次操作的日志。
在上述miniservice中,在REST的入口处,通过
[python]
view plain
copy
context.RequestContext()
即生成了这样的request_id,之后每次log都会自动带上它。
6. RPC调用 oslo.messaging
一个服务对外是REST接口,而服务内部的多个组件走的是RPC。Openstack中,RPC一般用rabbitmq来实现,oslo.messaging就是封装它的。可惜的是,它也要让读者有amqp的背景知识。
server.py
[python]
view plain
copy
from oslo_config import cfg
import oslo_messaging
from oslo_log import log as logging
import time
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, "myservice")
CONF(default_config_files=['app.conf'])
class ServerControlEndpoint(object):
target = oslo_messaging.Target(namespace='control',
version='2.0')
def __init__(self, server):
self.server = server
def stop(self, ctx):
if self.server:
self.server.stop()
class TestEndpoint(object):
def test(self, ctx, arg):
print "test"
print arg
return arg
transport = oslo_messaging.get_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test123', server='server1')
endpoints = [
ServerControlEndpoint(None),
TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='blocking')
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
client.py
[python]
view plain
copy
import oslo_messaging as messaging
from oslo_context import context
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
logging.register_options(CONF)
logging.setup(CONF, "myservice")
CONF(default_config_files=['app.conf'])
ctxt = {}
arg = {'a':'b'}
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='test123')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
相关文章推荐
- openstack的公共库(oslo)的使用
- openstack的公共库(oslo)的使用
- OpenStack配置解析库oslo.config的使用方法
- OpenStack配置解析库oslo.config的使用方法
- OpenStack配置解析库oslo.config的使用方法
- OpenStack公共组件oslo之十六——stevedore
- OpenStack公共组件oslo之十五——taskflow
- OpenStack公共组件oslo之五——oslo.service
- OpenStack公共组件oslo之四——oslo.context
- OpenStack公共组件oslo之十一——oslo.serialization
- OpenStack公共组件oslo之三——oslo.log
- OpenStack公共组件oslo之八——oslo.i18n
- OpenStack公共组件oslo之十四——pbr
- OpenStack公共组件oslo之二——oslo.utils
- OpenStack公共组件oslo之九——oslo.db
- OpenStack的oslo_messaging组件使用
- OpenStack oslo.messaging使用
- OpenStack公共组件oslo之十三——oslo.cache
- OpenStack公共组件oslo之七——oslo.middleware
- sas使用公共变量传值