Swift源码分析----swift-proxy与swift-account(2)
2014-07-25 00:18
411 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
接续上一篇博客:
PUT
/swift/proxy/controllers/account.py----class AccountController(Controller)----def PUT
/swift/account/server.py----class AccountController(object)----def PUT
当url中有<container>时,会新建/更新container的metadata信息;
当url中没有<container>时,会更新account的metadata信息;
POST
/swift/proxy/controllers/account.py----class AccountController(Controller)----def POST
/swift/account/server.py----class AccountController(object)----def POST
DELETE
/swift/proxy/controllers/account.py----class AccountController(Controller)----def DELETE
/swift/account/server.py----class AccountController(object)----def DELETE
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
接续上一篇博客:
PUT
/swift/proxy/controllers/account.py----class AccountController(Controller)----def PUT
def PUT(self, req): """HTTP PUT request handler.""" if not self.app.allow_account_management: return HTTPMethodNotAllowed( request=req, headers={'Allow': ', '.join(self.allowed_methods)}) error_response = check_metadata(req, 'account') if error_response: return error_response if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) return resp # 获取指定account的分区号和所有副本节点; account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) # 根据原始请求和额外的头信息,为后端的请求生成头信息字典; headers = self.generate_request_headers(req, transfer=True) clear_info_cache(self.app, req.environ, self.account_name) resp = self.make_requests( req, self.app.account_ring, account_partition, 'PUT', req.swift_entity_path, [headers] * len(accounts)) self.add_acls_from_sys_metadata(resp) return resp这里代码比较好理解,这里来看以下方法make_requests的实现,这个方法是比较重要的:
def make_requests(self, req, ring, part, method, path, headers, query_string=''): """ 发送一个HTTP请求到多个节点,并汇聚所有返回的响应信息; 根据投票机制,根据现实所有响应信息,返回通过投票机制的响应信息(因为是获取多个节点的响应信息); 调用示例: resp = self.make_requests( req, self.app.account_ring, account_partition, 'PUT', req.path_info, [headers] * len(accounts)) """ # get_part_nodes:获取一个分区所有副本相关的节点信息; start_nodes = ring.get_part_nodes(part) nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part)) pile = GreenAsyncPile(len(start_nodes)) for head in headers: # _make_request:发送请求迭代器,实现一次发送请求到一个远程节点; pile.spawn(self._make_request, nodes, part, method, path, head, query_string, self.app.logger.thread_locals) response = [] statuses = [] for resp in pile: if not resp: continue response.append(resp) statuses.append(resp[0]) if self.have_quorum(statuses, len(start_nodes)): break # give any pending requests *some* chance to finish pile.waitall(self.app.post_quorum_timeout) while len(response) < len(start_nodes): response.append((HTTP_SERVICE_UN***AILABLE, '', '', '')) statuses, reasons, resp_headers, bodies = zip(*response) # 根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息(因为是获取多个节点的响应信息); return self.best_response(req, statuses, reasons, bodies, '%s %s' % (self.server_type, req.method), headers=resp_headers)具体的解释代码注释中已经标注了,也是比较好理解的,再来看一下这里方法best_response的具体实现:
def best_response(self, req, statuses, reasons, bodies, server_type, etag=None, headers=None): """ 从一些服务器给定响应的列表,根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息; """ # Response:WSGI相应对象类; resp = Response(request=req) if len(statuses): for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST): hstatuses = [s for s in statuses if hundred <= s < hundred + 100] if len(hstatuses) >= quorum_size(len(statuses)): status = max(hstatuses) status_index = statuses.index(status) resp.status = '%s %s' % (status, reasons[status_index]) resp.body = bodies[status_index] if headers: update_headers(resp, headers[status_index]) if etag: resp.headers['etag'] = etag.strip('"') return resp self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'), {'type': server_type, 'statuses': statuses}) resp.status = '503 Internal Server Error' return resp
/swift/account/server.py----class AccountController(object)----def PUT
def PUT(self, req): """ 处理HTTP协议PUT请求; PUT请求会handle两种类型的HTTP请求: 当url中有<container>时,会新建/更新container的metadata信息; 当url中没有<container>时,会更新account的metadata信息; """ # 分割和验证给定的请求路径,获取drive, part, account, container; drive, part, account, container = split_and_validate_path(req, 3, 4) # mount_check是是否进行mount检查; # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间; if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) # 当url中有<container>时,会新建/更新container的信息; if container: # put account container pending_timeout = None if 'x-trans-id' in req.headers: pending_timeout = 3 # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作; broker = self._get_account_broker(drive, part, account, pending_timeout=pending_timeout) # 如果相关的数据库不存在,则进行数据库的初始化操作; if account.startswith(self.auto_create_account_prefix) and not os.path.exists(broker.db_file): # normalize_timestamp:把timestamp(时间戳)转换为标准格式; # initialize:数据库初始化; try: broker.initialize(normalize_timestamp(req.headers.get('x-timestamp') or time.time())) except DatabaseAlreadyExists: pass if req.headers.get('x-account-override-deleted', 'no').lower() != 'yes' and broker.is_deleted(): return HTTPNotFound(request=req) # 应用给定的属性建立container(在数据库中); broker.put_container(container, req.headers['x-put-timestamp'], req.headers['x-delete-timestamp'], req.headers['x-object-count'], req.headers['x-bytes-used']) if req.headers['x-delete-timestamp'] > req.headers['x-put-timestamp']: return HTTPNoContent(request=req) else: return HTTPCreated(request=req) # 当url中没有<container>时,会更新account的metadata信息 else: # put account # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作; broker = self._get_account_broker(drive, part, account) # normalize_timestamp:把timestamp(时间戳)转换为标准格式; timestamp = normalize_timestamp(req.headers['x-timestamp']) # 如果对应的数据库对象不存在,则进行数据库的初始化操作; if not os.path.exists(broker.db_file): try: broker.initialize(timestamp) created = True except DatabaseAlreadyExists: created = False # is_status_deleted:如果状态标志为DELETED,则返回true; elif broker.is_status_deleted(): return self._deleted_response(broker, req, HTTPForbidden, body='Recently deleted') else: # 检测帐号的数据库是否被删除; created = broker.is_deleted() # 如果put_timestamp小于当前的时间戳timestamp,则更新put_timestamp; broker.update_put_timestamp(timestamp) if broker.is_deleted(): return HTTPConflict(request=req) metadata = {} # 根据requesthead中的以'x-account-meta-'开始的key的值更新到metadata; metadata.update((key, (value, timestamp)) for key, value in req.headers.iteritems() if is_sys_or_user_meta('account', key)) # 更新数据库的metadata字段(AccountBroker#update_metadata); # 更新数据库的元数据字典; if metadata: broker.update_metadata(metadata) # 如果created==True,返回201Created,否则返回202Accepted if created: return HTTPCreated(request=req) else: return HTTPAccepted(request=req)PUT请求会handle两种类型的HTTP请求:
当url中有<container>时,会新建/更新container的metadata信息;
当url中没有<container>时,会更新account的metadata信息;
POST
/swift/proxy/controllers/account.py----class AccountController(Controller)----def POST
def POST(self, req): """HTTP POST request handler.""" if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH) return resp error_response = check_metadata(req, 'account') if error_response: return error_response # 获取指定account的分区号和所有副本节点; account_partition, accounts = self.app.account_ring.get_nodes(self.account_name) # 根据原始请求和额外的头信息,为后端的请求生成头信息字典; headers = self.generate_request_headers(req, transfer=True) clear_info_cache(self.app, req.environ, self.account_name) resp = self.make_requests( req, self.app.account_ring, account_partition, 'POST', req.swift_entity_path, [headers] * len(accounts)) # 如果没有找到指定account,则先建立这个account; # 再实现对其的POST操作; if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: self.autocreate_account(req.environ, self.account_name) resp = self.make_requests( req, self.app.account_ring, account_partition, 'POST', req.swift_entity_path, [headers] * len(accounts)) self.add_acls_from_sys_metadata(resp) return resp
/swift/account/server.py----class AccountController(object)----def POST
def POST(self, req): """ 处理HTTP协议的POST请求; 实现更新account的元数据信息,从head中取出特定要求的metadata更新至指定account的数据库(AccountBroker#update_metadata); """ drive, part, account = split_and_validate_path(req, 3) if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']): return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain') if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作; broker = self._get_account_broker(drive, part, account) if broker.is_deleted(): return self._deleted_response(broker, req, HTTPNotFound) # 把timestamp(时间戳)转换为标准格式; timestamp = normalize_timestamp(req.headers['x-timestamp']) metadata = {} metadata.update((key, (value, timestamp)) for key, value in req.headers.iteritems() if is_sys_or_user_meta('account', key)) # 然后从head中取出特定要求的metadata更新至数据库; if metadata: broker.update_metadata(metadata) return HTTPNoContent(request=req)
DELETE
/swift/proxy/controllers/account.py----class AccountController(Controller)----def DELETE
def DELETE(self, req): """HTTP DELETE request handler.""" if req.query_string: return HTTPBadRequest(request=req) if not self.app.allow_account_management: return HTTPMethodNotAllowed( request=req, headers={'Allow': ', '.join(self.allowed_methods)}) # 获取指定account的分区号和所有副本节点; account_partition, accounts = self.app.account_ring.get_nodes(self.account_name) # 根据原始请求和额外的头信息,为后端的请求生成头信息字典; headers = self.generate_request_headers(req) clear_info_cache(self.app, req.environ, self.account_name) resp = self.make_requests( req, self.app.account_ring, account_partition, 'DELETE', req.swift_entity_path, [headers] * len(accounts))
/swift/account/server.py----class AccountController(object)----def DELETE
def DELETE(self, req): """ 处理HTTP协议DELETE请求; DELETE请求会删除当前account,但是这里的删除是逻辑删除,只是标记account为删除状态,并不会真正删除account和相关资源; """ # 分割和验证给定的请求路径,获取drive, part, account; drive, part, account = split_and_validate_path(req, 3) # mount_check是是否进行mount检查; # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间; # root是devices所在根目录; # self.root = conf.get('devices', '/srv/node') if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) # 检查head是否包含特定信息'x-timestamp'; if 'x-timestamp' not in req.headers or \ not check_float(req.headers['x-timestamp']): return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain') # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作; broker = self._get_account_broker(drive, part, account) # 如果对应的数据库中的对象已经删除,则引发http错误提示; if broker.is_deleted(): return self._deleted_response(broker, req, HTTPNotFound) # 对数据库中的对象进行删除状态的标记工作,并不会执行文件的删除工作; broker.delete_db(req.headers['x-timestamp']) return self._deleted_response(broker, req, HTTPNoContent)
相关文章推荐
- Swift源码分析----swift-proxy与swift-account(1)
- Swift源码分析----swift-account-auditor
- Swift源码分析----swift-account-replicator(2)
- Swift源码分析----swift-account-info
- openstack 源码分析之swift proxy 服务启动 2
- 【原创】OpenStack Swift源码分析(三)proxy服务启动
- Swift源码分析----swift-proxy实现请求req的转发
- Swift源码分析----swift-account-audit(2)
- Swift源码分析----swift-account-reaper(2)
- Swift源码分析----swift-proxy与swift-object(2)
- openstack 源码分析之swift proxy 服务启动 1
- Swift源码分析----swift-proxy与swift-object(1)
- Swift源码分析----swift-proxy实现请求req的转发
- Swift源码分析----swift-account-reaper(1)
- 【原创】OpenStack Swift源码分析(四)proxy服务响应
- Swift源码分析----swift-account-audit(1)
- OpenStack之Swift:账户服务器(Account Server)源码分析
- OpenStack Swift源码分析(4)----swift-ring-builder源代码解析之一
- OpenStack Swift源码分析(1)----swift服务启动源码分析之一
- OpenStack Swift源码分析(3)----swift服务启动源码分析之三