Swift源码分析----swift-account-audit(2)
2014-07-24 16:40
429 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
接续上篇:
转到3.2,来看方法audit_container的实现:
3.2.2 调用方法audit_account实现获取指定account下的容器列表,验证当前指定容器是否包含其中;
3.2.3 获取指定name容器的所有副本的相关节点和分区号;
3.2.4 针对容器的所有副本相关节点,进行遍历,对于每个节点执行以下操作:
通过HTTP应用GET方法远程获取节点的验证响应信息,通过响应信息的状态值,判断远程节点副本容器是否存在;
3.2.5 递归调用方法audit_object实现审计验证容器下每个object;
转到3.3,来看方法audit_account的实现:
3.3.2 获取指定name账户的所有副本的相关节点和分区号;
3.3.4 针对账户的所有副本相关节点,进行遍历,对于每个节点执行以下操作:
通过HTTP应用GET方法远程获取节点的验证响应信息,通过响应信息的状态值,判断远程副本账户是否存在;
3.3.5 递归调用方法audit_container实现审计验证账户下每个container;
至此,这个脚本的账户/容器/脚本的审计验证流程分析完成,当然在具体审计验证过程中还有很多其他细节,这里不再赘述。
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
接续上篇:
转到3.2,来看方法audit_container的实现:
def audit_container(self, account, name, recurse=False): """ 指定container的审计验证,并实现递归验证container下每个object; """ if (account, name) in self.in_progress: self.in_progress[(account, name)].wait() if (account, name) in self.list_cache: return self.list_cache[(account, name)] self.in_progress[(account, name)] = Event() print 'Auditing container "%s"' % name # # 指定指定account下的容器具体路径; path = '/%s/%s' % (account, name) # 获取指定account下的容器列表; account_listing = self.audit_account(account) consistent = True if name not in account_listing: consistent = False print " Container %s not in account listing!" % path # 获取指定name容器的所有副本的相关节点和分区号; # 获取account/container/object所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上); # 返回元组(分区,节点信息列表); # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta; part, nodes = self.container_ring.get_nodes(account, name.encode('utf-8')) rec_d = {} responses = {} for node in nodes: marker = '' results = True while results: try: conn = http_connect(node['ip'], node['port'], node['device'], part, 'GET', path.encode('utf-8'), {}, 'format=json&marker=%s' % quote(marker.encode('utf-8'))) # 获取来自服务器的响应; resp = conn.getresponse() if resp.status // 100 != 2: self.container_not_found += 1 consistent = False print(' Bad status GETting container "%s" on %s/%s' % (path, node['ip'], node['device'])) break if node['id'] not in responses: responses[node['id']] = dict(resp.getheaders()) results = simplejson.loads(resp.read()) except Exception: self.container_exceptions += 1 consistent = False print ' Exception GETting container "%s" on %s/%s' % \ (path, node['ip'], node['device']) break if results: marker = results[-1]['name'] for obj in results: obj_name = obj['name'] if obj_name not in rec_d: rec_d[obj_name] = obj if (obj['last_modified'] != rec_d[obj_name]['last_modified']): self.container_obj_mismatch += 1 consistent = False print(" Different versions of %s/%s " "in container dbs." % (name, obj['name'])) if (obj['last_modified'] > rec_d[obj_name]['last_modified']): rec_d[obj_name] = obj obj_counts = [int(header['x-container-object-count']) for header in responses.values()] if not obj_counts: consistent = False print " Failed to fetch container %s at all!" % path else: if len(set(obj_counts)) != 1: self.container_count_mismatch += 1 consistent = False print " Container databases don't agree on number of objects." print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts)) self.containers_checked += 1 self.list_cache[(account, name)] = rec_d self.in_progress[(account, name)].send(True) del self.in_progress[(account, name)] # 递归验证container下每个object; if recurse: for obj in rec_d.keys(): self.pool.spawn_n(self.audit_object, account, name, obj) if not consistent and self.error_file: print >>open(self.error_file, 'a'), path return rec_d3.2.1 获取指定account下的容器具体路径;
3.2.2 调用方法audit_account实现获取指定account下的容器列表,验证当前指定容器是否包含其中;
3.2.3 获取指定name容器的所有副本的相关节点和分区号;
3.2.4 针对容器的所有副本相关节点,进行遍历,对于每个节点执行以下操作:
通过HTTP应用GET方法远程获取节点的验证响应信息,通过响应信息的状态值,判断远程节点副本容器是否存在;
3.2.5 递归调用方法audit_object实现审计验证容器下每个object;
转到3.3,来看方法audit_account的实现:
def audit_account(self, account, recurse=False): """ 指定account的审计验证,并实现递归验证account下每个container,并且进一步实现递归验证container下每个object; """ if account in self.in_progress: self.in_progress[account].wait() if account in self.list_cache: return self.list_cache[account] self.in_progress[account] = Event() print 'Auditing account "%s"' % account consistent = True path = '/%s' % account # 获取指定name账户的所有副本的相关节点和分区号; # 获取account所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上); # 返回元组(分区,节点信息列表); # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta; part, nodes = self.account_ring.get_nodes(account) responses = {} for node in nodes: marker = '' results = True while results: node_id = node['id'] try: # 建立一个HTTPConnection类的对象; # 并获取来自服务器的响应信息; conn = http_connect(node['ip'], node['port'], node['device'], part, 'GET', path, {}, 'format=json&marker=%s' % quote(marker.encode('utf-8'))) resp = conn.getresponse() if resp.status // 100 != 2: self.account_not_found += 1 consistent = False print(" Bad status GETting account '%s' " " from %ss:%ss" % (account, node['ip'], node['device'])) break results = simplejson.loads(resp.read()) except Exception: self.account_exceptions += 1 consistent = False print(" Exception GETting account '%s' on %ss:%ss" % (account, node['ip'], node['device'])) break if node_id not in responses: responses[node_id] = [dict(resp.getheaders()), []] responses[node_id][1].extend(results) if results: marker = results[-1]['name'] headers = [resp[0] for resp in responses.values()] cont_counts = [int(header['x-account-container-count']) for header in headers] if len(set(cont_counts)) != 1: self.account_container_mismatch += 1 consistent = False print(" Account databases for '%s' don't agree on" " number of containers." % account) if cont_counts: print " Max: %s, Min: %s" % (max(cont_counts), min(cont_counts)) obj_counts = [int(header['x-account-object-count']) for header in headers] if len(set(obj_counts)) != 1: self.account_object_mismatch += 1 consistent = False print(" Account databases for '%s' don't agree on" " number of objects." % account) if obj_counts: print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts)) containers = set() for resp in responses.values(): containers.update(container['name'] for container in resp[1]) self.list_cache[account] = containers self.in_progress[account].send(True) del self.in_progress[account] self.accounts_checked += 1 if recurse: for container in containers: self.pool.spawn_n(self.audit_container, account, container, True) if not consistent and self.error_file: print >>open(self.error_file, 'a'), path return containers3.3.1 获取指定账户的具体路径;
3.3.2 获取指定name账户的所有副本的相关节点和分区号;
3.3.4 针对账户的所有副本相关节点,进行遍历,对于每个节点执行以下操作:
通过HTTP应用GET方法远程获取节点的验证响应信息,通过响应信息的状态值,判断远程副本账户是否存在;
3.3.5 递归调用方法audit_container实现审计验证账户下每个container;
至此,这个脚本的账户/容器/脚本的审计验证流程分析完成,当然在具体审计验证过程中还有很多其他细节,这里不再赘述。
相关文章推荐
- Swift源码分析----swift-account-audit(1)
- OpenStack之Swift:账户服务器(Account Server)源码分析
- Swift源码分析----swift-account-reaper(1)
- Swift源码分析----swift-account-replicator(2)
- Swift源码分析----swift-account-auditor
- Swift源码分析----swift-proxy与swift-account(1)
- Swift源码分析----swift-proxy与swift-account(2)
- Swift源码分析----swift-account-reaper(2)
- Swift源码分析----swift-account-info
- OpenStack_Swift源码分析——Object-auditor源码分析(2)
- 【原创】OpenStack Swift源码分析(六)object服务
- OpenStack_Swift源码分析——ObjectReplicator源码分析(2)
- OpenStack_Swift源码分析——创建Ring及添加设备源码算法详细分析
- openstack_swift源码分析——Swift单机部署
- OpenStack_Swift源码分析——Ring的rebalance算法源代码详细分析
- OpenStack_Swift源码分析——Ring代码的组织架构
- OpenStack Swift源码分析(1)----swift服务启动源码分析之一
- OpenStack之Swift:容器服务器(Container Server)源码分析
- OpenStack之Swift:环(Ring)源码分析
- OpenStack_Swift源码分析——Ring基本原理及一致性Hash算法