您的位置:首页 > 移动开发 > Swift

Swift Proxy 调用流程(源码剖析)

2016-07-19 15:38 197 查看
  Swift启用WSGI服务的事件循环队列pipeline: catch_errors, proxy-logging, cache, authtoken, keystone, (slo), proxy-server。通过proxy-server的服务入口点,实现请求的具体处理和响应。

  proxy-server服务入口点(/swift-kilo-eol/swift/proxy/server.py)

def __call__(self, env, start_response):
"""
WSGI 服务入口点

:param env: WSGI 环境变量(字典类型)
:param start_response: WSGI 可调用对象
"""

# 获取环境变量env中的'swift.cache',若为None,直接报错返回
# 若req.headers中只有'x-storage-token',则用req.headers的'x-auth-token'来更新
# 进而调用handle_request进行具体的执行和转发
try:
if self.memcache is None:
self.memcache = cache_from_env(env, True)
req = self.update_request(Request(env))
return self.handle_request(req)(env, start_response)
except UnicodeError:
err = HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
return err(env, start_response)
except (Exception, Timeout):
start_response('500 Server Error',
[('Content-Type', 'text/plain')])
return ['Internal server error.\n']</span>


  事实上真正调用的是handle_request()来完成请求的具体处理:

1.根据req.path的信息返回对应的控制器类;

2.实例化具体的控制器类对象;

3.获取控制器类中由req.method指定的方法;

4.执行具体的方法并返回。

def handle_request(self, req):
"""
Entry point for proxy server.
Should return a WSGI-style callable (such as swob.Response).

:param req: swob.Request object
"""
try:
self.logger.set_statsd_prefix('proxy-server')
if req.content_length and req.content_length < 0:
self.logger.increment('errors')
return HTTPBadRequest(request=req,
body='Invalid Content-Length')

try:
if not check_utf8(req.path_info):
self.logger.increment('errors')
return HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
except UnicodeError:
self.logger.increment('errors')
return HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')

try:
# 根据req.path的信息返回对应的控制器类和字典(由版本、Account、Container和Object名组成)
# 若req.path是/info,则返回InfoController;
# 若req.path中account、container、object都存在,则返回ObjectController
# 若req.path中只有account、container,则返回ObjectController
# 若req.path中只有account,则返回AccountController

controller, path_parts = self.get_controller(req)
p = req.path_info
if isinstance(p, unicode):
p = p.encode('utf-8')
except APIVersionError:
self.logger.increment('errors')
return HTTPBadRequest(request=req)
except ValueError:
self.logger.increment('errors')
return HTTPNotFound(request=req)
if not controller:
self.logger.increment('errors')
return HTTPPreconditionFailed(request=req, body='Bad URL')
if self.deny_host_headers and \
req.host.split(':')[0] in self.deny_host_headers:
return HTTPForbidden(request=req, body='Invalid host header')

self.logger.set_statsd_prefix('proxy-server.' +
controller.server_type.lower())
# 用get_controller返回的字典 实例化具体的控制器类对象
controller = controller(self, **path_parts)
if 'swift.trans_id' not in req.environ:
# if this wasn't set by an earlier middleware, set it now
trans_id_suffix = self.trans_id_suffix
trans_id_extra = req.headers.get('x-trans-id-extra')
if trans_id_extra:
trans_id_suffix += '-' + trans_id_extra[:32]
trans_id = generate_trans_id(trans_id_suffix)
req.environ['swift.trans_id'] = trans_id
self.logger.txn_id = trans_id
req.headers['x-trans-id'] = req.environ['swift.trans_id']
controller.trans_id = req.environ['swift.trans_id']
self.logger.client_ip = get_remote_client(req)
try:
# 获取具体控制器类中由request指定的方法
handler = getattr(controller, req.method)
getattr(handler, 'publicly_accessible')
except AttributeError:
allowed_methods = getattr(controller, 'allowed_methods', set())
return HTTPMethodNotAllowed(
request=req, headers={'Allow': ', '.join(allowed_methods)})
old_authorize = None
if 'swift.authorize' in req.environ:
# We call authorize before the handler, always. If authorized,
# we remove the swift.authorize hook so isn't ever called
# again. If not authorized, we return the denial unless the
# controller's method indicates it'd like to gather more
# information and try again later.
resp = req.environ['swift.authorize'](req)
if not resp and not req.headers.get('X-Copy-From-Account') \
and not req.headers.get('Destination-Account'):
# No resp means authorized, no delayed recheck required.
old_authorize = req.environ['swift.authorize']
else:
# Response indicates denial, but we might delay the denial
# and recheck later. If not delayed, return the error now.
if not getattr(handler, 'delay_denial', None):
return resp
# Save off original request method (GET, POST, etc.) in case it
# gets mutated during handling.  This way logging can display the
# method the client actually sent.
req.environ['swift.orig_req_method'] = req.method
try:
if old_authorize:
req.environ.pop('swift.authorize', None)
# 执行具体控制器类中由request指定的方法
return handler(req)
finally:
if old_authorize:
req.environ['swift.authorize'] = old_authorize
except HTTPException as error_response:
return error_response
except (Exception, Timeout):
self.logger.exception(_('ERROR Unhandled exception in request'))
return HTTPServerError(request=req)</span>

  具体调用过程:请求到达proxy-server的服务入口点之后,在handle_request()中获取具体的控制器类(AccountController), 接着调用/swift-kilo-eol/swift/controllers/account.py下面的GET/HEAD/PUT/POST/DELETE等方法实现与具体存储服务(AccountServer)的连接,进而调用具体的GET/HEAD/PUT/POST/DELETE等方法实现请求的处理和相应

  下述是GET/HEAD/PUT/POST/DELETE等方法在源码中的具体调用过程,详细过程已在源码中注释

/swift-kilo-eol/swift/proxy/controller/account.py

def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
# 处理HTTP GET/HEAD请求
if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
resp.body = 'Account name length of %d longer than %d' % \
(len(self.account_name),
constraints.MAX_ACCOUNT_NAME_LENGTH)
return resp

# 调用get_part获取经过一致性Hash取值和移位之后的Object存放的分区号
# 调用iter_nodes获取排除了HandOff以及Error的Object存放的节点号
# 调用GETorHEAD_base处理HTTP GET/HEAD请求 返回swob.Response对象
partition = self.app.account_ring.get_part(self.account_name)
node_iter = self.app.iter_nodes(self.app.account_ring, partition)
resp = self.GETorHEAD_base(
req, _('Account'), node_iter, partition,
req.swift_entity_path.rstrip('/'))
if resp.status_int == HTTP_NOT_FOUND:
if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
resp.status = HTTP_GONE
elif self.app.account_autocreate:
resp = account_listing_response(self.account_name, req,
get_listing_content_type(req))
if req.environ.get('swift_owner'):
self.add_acls_from_sys_metadata(resp)
else:
for header in self.app.swift_owner_headers:
resp.headers.pop(header, None)
return resp</span>

/swift-kilo-eol/swift/account/server.py

def HEAD(self, req):
"""Handle HTTP HEAD request."""
# 处理HEAD请求,返回Account的基本信息,以KV的形式保存在HEAD中

drive, part, account = split_and_validate_path(req, 3)
out_content_type = get_listing_content_type(req)

# 进行mount检查
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# 返回一个AccountBroker实例,用于对sqlite数据的查询操作
broker = self._get_account_broker(drive, part, account,
pending_timeout=0.1,
stale_reads_ok=True)
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)
# get_response_headers内部调用get_info()获取Account的基本信息,并更新res的HEAD('X-Account-Container-Count','X-Account-Object-Count': info['object_count'],
# 'X-Account-Bytes-Used', 'X-Timestamp', 'X-PUT-Timestamp')
headers = get_response_headers(broker)
headers['Content-Type'] = out_content_type
return HTTPNoContent(request=req, headers=headers, charset='utf-8')</span>

/swift-kilo-eol/swift/account/server.py

def GET(self, req):
"""Handle HTTP GET request."""
# 处理GET请求,返回Account的基本信息,以KV的形式保存在HEAD中,但与HEAD不一样的是GET方法中获取了
# 指定Account下的Container列表,存储在Body中
# 调用机制与HEAD类似

drive, part, account = split_and_validate_path(req, 3)
prefix = get_param(req, 'prefix')
delimiter = get_param(req, 'delimiter')
if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
# delimiters can be made more flexible later
return HTTPPreconditionFailed(body='Bad delimiter')
limit = constraints.ACCOUNT_LISTING_LIMIT
given_limit = get_param(req, 'limit')
if given_limit and given_limit.isdigit():
limit = int(given_limit)
if limit > constraints.ACCOUNT_LISTING_LIMIT:
return HTTPPreconditionFailed(
request=req,
body='Maximum limit is %d' %
constraints.ACCOUNT_LISTING_LIMIT)
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
out_content_type = get_listing_content_type(req)

if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
broker = self._get_account_broker(drive, part, account,
pending_timeout=0.1,
stale_reads_ok=True)
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)

# 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回
# 以不同的形式返回Account对应的Container列表,最普通的是放在body中
return account_listing_response(account, req, out_content_type, broker,
limit, marker, end_marker, prefix,
delimiter)</span>


def account_listing_response(account, req, response_content_type, broker=None,
limit='', marker='', end_marker='', prefix='',
delimiter=''):
if broker is None:
broker = FakeAccountBroker()

resp_headers = get_response_headers(broker)

# 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回
account_list = broker.list_containers_iter(limit, marker, end_marker,
prefix, delimiter)

# 以不同的形式返回Account对应的Container列表,最普通的是放在body中
if response_content_type == 'application/json':
data = []
for (name, object_count, bytes_used, is_subdir) in account_list:
if is_subdir:
data.append({'subdir': name})
else:
data.append({'name': name, 'count': object_count,
'bytes': bytes_used})
account_list = json.dumps(data)
elif response_content_type.endswith('/xml'):
output_list = ['<?xml version="1.0" encoding="UTF-8"?>',
'<account name=%s>' % saxutils.quoteattr(account)]
for (name, object_count, bytes_used, is_subdir) in account_list:
if is_subdir:
output_list.append(
'<subdir name=%s />' % saxutils.quoteattr(name))
else:
item = '<container><name>%s</name><count>%s</count>' \
'<bytes>%s</bytes></container>' % \
(saxutils.escape(name), object_count, bytes_used)
output_list.append(item)
output_list.append('</account>')
account_list = '\n'.join(output_list)
else:
if not account_list:
resp = HTTPNoContent(request=req, headers=resp_headers)
resp.content_type = response_content_type
resp.charset = 'utf-8'
return resp
account_list = '\n'.join(r[0] for r in account_list) + '\n'
ret = HTTPOk(body=account_list, request=req, headers=resp_headers)
ret.content_type = response_content_type
ret.charset = 'utf-8'
return ret</span>


/swift-kilo-eol/swift/proxy/controller/account.py

def PUT(self, req):
"""HTTP PUT request handler."""
# 处理HTTP PUT请求
if not self.app.allow_account_management:
return HTTPMethodNotAllowed(
request=req,
headers={'Allow': ', '.join(self.allowed_methods)})
error_response = check_metadata(req, 'account')
if error_response:
return error_response
if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
resp.body = 'Account name length of %d longer than %d' % \
(len(self.account_name),
constraints.MAX_ACCOUNT_NAME_LENGTH)
return resp
# 获取Account对应的分区号,及其对应的节点号(节点以元祖形式返回)
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
# 根据原始请求头信息调用generate_request_headers生成新格式的请求头
headers = self.generate_request_headers(req, transfer=True)
clear_info_cache(self.app, req.environ, self.account_name)
# 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
resp = self.make_requests(
req, self.app.account_ring, account_partition, 'PUT',
req.swift_entity_path, [headers] * len(accounts))
self.add_acls_from_sys_metadata(resp)
return resp</span>


def make_requests(self, req, ring, part, method, path, headers,
query_string='', overrides=None):
"""
Sends an HTTP request to multiple nodes and aggregates the results.
It attempts the primary nodes concurrently, then iterates over the
handoff nodes as needed.

:param req: a request sent by the client
:param ring: the ring used for finding backend servers
:param part: the partition number
:param method: the method to send to the backend
:param path: the path to send to the backend
(full path ends up being  /<$device>/<$part>/<$path>)
:param headers: a list of dicts, where each dict represents one
backend request that should be made.
:param query_string: optional query string to send to the backend
:param overrides: optional return status override map used to override
the returned status of a request.
:returns: a swob.Response object
"""
# 迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息

# 调用get_part_nodes返回分区号对应的所有分区号
start_nodes = ring.get_part_nodes(part)
nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
# 创建协程池
pile = GreenAsyncPile(len(start_nodes))
for head in headers:
# 从协程池中获取一个协程发送请求到一个远程节点(根据选定的备份数)
pile.spawn(self._make_request, nodes, part, method, path,
head, query_string, self.app.logger.thread_locals)
response = []
statuses = []
for resp in pile:
if not resp:
continue
response.append(resp)
statuses.append(resp[0])
if self.have_quorum(statuses, len(start_nodes)):
break
# give any pending requests *some* chance to finish
# 等到所有请求都返回
finished_quickly = pile.waitall(self.app.post_quorum_timeout)
for resp in finished_quickly:
if not resp:
continue
response.append(resp)
statuses.append(resp[0])
while len(response) < len(start_nodes):
response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
statuses, reasons, resp_headers, bodies = zip(*response)
# 通过投票机制返回最佳响应信息
return self.best_response(req, statuses, reasons, bodies,
'%s %s' % (self.server_type, req.method),
overrides=overrides, headers=resp_headers)


def best_response(self, req, statuses, reasons, bodies, server_type,
etag=None, headers=None, overrides=None,
quorum_size=None):
"""
Given a list of responses from several servers, choose the best to
return to the API.

:param req: swob.Request object
:param statuses: list of statuses returned
:param reasons: list of reasons for each status
:param bodies: bodies of each response
:param server_type: type of server the responses came from
:param etag: etag
:param headers: headers of each response
:param overrides: overrides to apply when lacking quorum
:param quorum_size: quorum size to use
:returns: swob.Response object with the correct status, body, etc. set
"""
# 调用_compute_quorum_response,根据Response的状态等选出最佳响应

if quorum_size is None:
quorum_size = self._quorum_size(len(statuses))

resp = self._compute_quorum_response(
req, statuses, reasons, bodies, etag, headers,
quorum_size=quorum_size)
if overrides and not resp:
faked_up_status_indices = set()
transformed = []
for (i, (status, reason, hdrs, body)) in enumerate(zip(
statuses, reasons, headers, bodies)):
if status in overrides:
faked_up_status_indices.add(i)
transformed.append((overrides[status], '', '', ''))
else:
transformed.append((status, reason, hdrs, body))
statuses, reasons, headers, bodies = zip(*transformed)
resp = self._compute_quorum_response(
req, statuses, reasons, bodies, etag, headers,
indices_to_avoid=faked_up_status_indices,
quorum_size=quorum_size)

if not resp:
resp = Response(request=req)
self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
{'type': server_type, 'statuses': statuses})
resp.status = '503 Internal Server Error'

return resp

/swift-kilo-eol/swift/account/server.py

def PUT(self, req):
"""Handle HTTP PUT request."""
# 处理PUT请求
# 若url中包括<Container>,新建数据库中的Container信息
# 若url中不包括<Container>,更新数据库中的Account的metadata信息
drive, part, account, container = split_and_validate_path(req, 3, 4)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
if container:   # put account container
if 'x-timestamp' not in req.headers:
timestamp = Timestamp(time.time())
else:
timestamp = valid_timestamp(req)
pending_timeout = None
container_policy_index = \
req.headers.get('X-Backend-Storage-Policy-Index', 0)
if 'x-trans-id' in req.headers:
pending_timeout = 3
# 调用_get_account_broker返回AccountBroker实例
broker = self._get_account_broker(drive, part, account,
pending_timeout=pending_timeout)
# 检查account对应的数据库不存在,则初始化AccountBroker的数据库
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp.internal)
except DatabaseAlreadyExists:
pass
if req.headers.get('x-account-override-deleted', 'no').lower() != \
'yes' and broker.is_deleted():
return HTTPNotFound(request=req)
# 将Container的信息存入数据库
broker.put_container(container, req.headers['x-put-timestamp'],
req.headers['x-delete-timestamp'],
req.headers['x-object-count'],
req.headers['x-bytes-used'],
container_policy_index)
if req.headers['x-delete-timestamp'] > \
req.headers['x-put-timestamp']:
return HTTPNoContent(request=req)
else:
return HTTPCreated(request=req)
else:   # put account
timestamp = valid_timestamp(req)
broker = self._get_account_broker(drive, part, account)
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp.internal)
created = True
except DatabaseAlreadyExists:
created = False
# 检查Account是否被标记为删除
elif broker.is_status_deleted():
return self._deleted_response(broker, req, HTTPForbidden,
body='Recently deleted')
# 检查Account是否已被删除
else:
created = broker.is_deleted()
broker.update_put_timestamp(timestamp.internal)
if broker.is_deleted():
return HTTPConflict(request=req)
# 更新数据库中的Account 的metadata信息
metadata = {}
metadata.update((key, (value, timestamp.internal))
for key, value in req.headers.iteritems()
if is_sys_or_user_meta('account', key))
if metadata:
broker.update_metadata(metadata, validate_metadata=True)
if created:
return HTTPCreated(request=req)
else:
return HTTPAccepted(request=req)

/swift-kilo-eol/swift/proxy/controller/account.py

def POST(self, req):
"""HTTP POST request handler."""
# 处理HTTP POST请求
if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
resp.body = 'Account name length of %d longer than %d' % \
(len(self.account_name),
constraints.MAX_ACCOUNT_NAME_LENGTH)
return resp
error_response = check_metadata(req, 'account')
if error_response:
return error_response
# 获取Account对应的分区号,及其对应的节点号
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
# 根据原始请求头信息调用generate_request_headers生成新格式的请求头
headers = self.generate_request_headers(req, transfer=True)
clear_info_cache(self.app, req.environ, self.account_name)
# 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息
resp = self.make_requests(
req, self.app.account_ring, account_partition, 'POST',
req.swift_entity_path, [headers] * len(accounts))
# 如果指定的account_name不存在,则先新建Account,再调用make_requests
if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
self.autocreate_account(req, self.account_name)
resp = self.make_requests(
req, self.app.account_ring, account_partition, 'POST',
req.swift_entity_path, [headers] * len(accounts))
self.add_acls_from_sys_metadata(resp)
return resp

/swift-kilo-eol/swift/account/server.py

def POST(self, req):
"""Handle HTTP POST request."""
# 处理POST请求
# 更新Account的matadata
drive, part, account = split_and_validate_path(req, 3)
req_timestamp = valid_timestamp(req)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# 调用_get_account_broker返回AccountBroker实例
broker = self._get_account_broker(drive, part, account)
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)
# 利用Head中的metadata更新数据库中Account的metadata
metadata = {}
metadata.update((key, (value, req_timestamp.internal))
for key, value in req.headers.iteritems()
if is_sys_or_user_meta('account', key))
if metadata:
broker.update_metadata(metadata, validate_metadata=True)
return HTTPNoContent(request=req)


/swift-kilo-eol/swift/proxy/controller/account.py

def DELETE(self, req):
"""HTTP DELETE request handler."""
# 处理HTTP DELETE请求
# Extra safety in case someone typos a query string for an
# account-level DELETE request that was really meant to be caught by
# some middleware.
if req.query_string:
return HTTPBadRequest(request=req)
if not self.app.allow_account_management:
return HTTPMethodNotAllowed(
request=req,
headers={'Allow': ', '.join(self.allowed_methods)})
# 获取Account对应的分区号,及其对应的节点号
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
# 根据原始请求头信息调用generate_request_headers生成新格式的请求头
headers = self.generate_request_headers(req)
clear_info_cache(self.app, req.environ, self.account_name)
# 调用make_requests
resp = self.make_requests(
req, self.app.account_ring, account_partition, 'DELETE',
req.swift_entity_path, [headers] * len(accounts))
return resp

/swift-kilo-eol/swift/account/server.py

def DELETE(self, req):
"""Handle HTTP DELETE request."""
# 处理DELETE请求
drive, part, account = split_and_validate_path(req, 3)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
req_timestamp = valid_timestamp(req)
broker = self._get_account_broker(drive, part, account)
# 检查当前Account是否已经被删除,如果已经被删除则直接返回
if broker.is_deleted():
return self._deleted_response(broker, req, HTTPNotFound)
# 将数据库中当前Account标记为删除状态,由AccountReaper服务来完成真正的清理工作
broker.delete_db(req_timestamp.internal)
return self._deleted_response(broker, req, HTTPNoContent)


  对于ContainerController和ObjectController的调用过程类似,详情有空再分析。本文还有许多可改进的地方,比如有些具体方法调用过程分析地不够详细


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Swift 源码 proxy