您的位置:首页 > 运维架构

OpenStack Nova-cell服务的源码解析(1)

2013-11-06 13:05 405 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。

博客地址:http://blog.csdn.net/gaoxingnengjisuan

邮箱地址:dong.liu@siat.ac.cn

上篇博客中对nova-cell服务的架构和配置进行了介绍,这篇博客我将对nova-cell服务的源码进行解析。需要说明的是,这里我都是以OpenStack的Grizzly版本为例进行解析,在这个版本中,默认是不启动这个服务的,而且在具体的cell虚拟机建立应用中,调度器还只是以随机的方式来选择建立虚拟机实例的cell,在后续的版本中,具体的调度器算法将会改进。

1.nova-cell服务的源码结构图




2.nova-cell服务的源码结构解析

/nova/cells/driver.py

class BaseCellsDriver(object):

cells通讯驱动基类,这个类主要实现了消息消费者处理的相关方法;

def start_consumers(self, msg_runner):

启动处理消息的消费者服务;

def stop_consumers(self):

关闭处理消息的消费者服务;

def send_message_to_cell(self, cell_state, message):

发送消息到一个cell;

/nova/cells/manager.py

注:在这个文件中,只有一个类CellsManager,定义和实现了cell管理的API方法;在类CellsManager所定义的方法中主要分为两种类型,即针对特定cell的处理方法和针对所有cell的处理方法;

class CellsManager(manager.Manager):

这个类主要定义和实现了用于管理cell的API;

大多数的方法属于“有针对性”或者“广播”的消息处理方式,分别对应于路由信息到特定的cell和路由信息到多个cell之上;

def post_start_hook(self):

为通过RPC来处理cell间的通讯,启动了两个独立的消费者;

如果本cell有子cell;

通知子cell发送它们的capabilities值和capacities值;

并对子cell的所有父cell的capabilities值和capacities值相关信息进行更新操作;

如果本cell在cell树的底层(即没有子cell);

发送本cell的capabilities值和capacities值到所有父cell,并对所有父cell的相关信息进行更新操作;

def _update_our_parents(self, ctxt):

如果本cell在cell树的底层(即没有子cell),则更新本cell的所有父cell,应用本cell的capabilities值和capacity值;

发送本cell的capabilities值到父cell,并对父cell的相关信息进行更新操作;

发送本cell的capacities值到父cell,并对父cell的相关信息进行更新操作;

def _heal_instances(self, ctxt):

为一些实例周期性的发送更新任务到父cell;

def _sync_instance(self, ctxt, instance):

广播instance_update或instance_destroy操作信息给父cell来执行;

def schedule_run_instance(self, ctxt, host_sched_kwargs):

选择一个合适的cell来建立新的实例,并转发相应的请求;

def get_cell_info_for_neighbors(self, _ctxt):

返回所有相邻cell的信息(本cell的所有父cell和所有子cell);

def run_compute_api_method(self, ctxt, cell_name, method_info, call):

在特定的cell中调用一个compute API方法;

运行变量compute_api指定的类中一个指定的方法;

def instance_update_at_top(self, ctxt, instance):

在top等级的cell上,更新实例;

def instance_destroy_at_top(self, ctxt, instance):

在cell树的顶层删除一个实例;

这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_destroy_at_top;

def instance_delete_everywhere(self, ctxt, instance, delete_type):

在每一个cell中调用compute API的delete()或者soft_delete()方法;

这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;

所以,我们需要在所有的cell上运行这个方法;

def instance_fault_create_at_top(self, ctxt, instance_fault):

在顶层cell上建立一个实例的断点(???);

def bw_usage_update_at_top(self, ctxt, bw_update_info):

更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;

def sync_instances(self, ctxt, project_id, updated_since, deleted):

强制对所有实例实行同步操作;

def service_get_all(self, ctxt, filters):

返回在本cell中和所有子cell中的服务;

并把每个服务以及相关的计算节点都和cell相关联起来;

def service_get_by_compute_host(self, ctxt, host_name):

在当前的cell中为计算主机返回获取的服务入口,主要执行了以下的步骤:

通过host_name获取cell_name的信息;

根据compute host信息获取相关服务;

把获取的服务和计算节点和cell关联起来;

def proxy_rpc_to_manager(self, ctxt, topic, rpc_message, call, timeout):

为给定的compute topic代理PRC;

def task_log_get_all(self, ctxt, task_name, period_beginning, period_ending, host=None, state=None):

从所有的cell或者特定的一个cell的数据库中获取任务日志;

def compute_node_get(self, ctxt, compute_id):

在一个特定的cell上,通过ID值获取计算节点;

并把计算节点和这个cell相关联起来;

def compute_node_get_all(self, ctxt, hypervisor_match=None):

返回所有cell中的计算节点列表;

def compute_node_stats(self, ctxt):

通过各个cell的各自计算操作,实现获取所有cell上的各个资源参数的总和;

def actions_get(self, ctxt, cell_name, instance_uuid):

为给定的实例获取所有的实例操作;

def action_get_by_request_id(self, ctxt, cell_name, instance_uuid, request_id):

通过request_id和instance_uuid为给定的实例获取操作信息;

def action_events_get(self, ctxt, cell_name, action_id):

在cell_name指定的cell上,通过action id获取相关事件信息;

def consoleauth_delete_tokens(self, ctxt, instance_uuid):

在API cell中为指定的实例删除consoleauth令牌;

def validate_console_port(self, ctxt, instance_uuid, console_port, console_type):

验证子cell中计算节点的控制台端口;

/nova/cells/messaging.py

在这个文件中,有如下些类:

class _BaseMessage(object):

cell通信模块的基类,主要定义和实现了消息队列和消息响应相关的处理方法;

下面的三个实现通信模块的类,分别对应三种cell处理的消息类型:

class _TargetedMessage(_BaseMessage):

这个类继承自类_BaseMessage,实现通信模块的若干方法,针对于消息处理目标是特定的cell的情况;

class _BroadcastMessage(_BaseMessage):

这个类继承自类_BaseMessage,实现通信模块的若干方法,针对于消息处理目标是所有的cell的情况;

class _ResponseMessage(_TargetedMessage):

这个类继承自类_TargetedMessage,实现通信模块的若干方法,针对于执行操作后返回的响应信息的处理;

class _BaseMessageMethods(base.Base):

实现所有处理cell方法的基类;

下面三个实现处理消息方法的类,分别对应三种cell处理的消息类型:

class _ResponseMessageMethods(_BaseMessageMethods):

这个类继承自类_BaseMessageMethods,实现了针对处理响应信息方法;

class _TargetedMessageMethods(_BaseMessageMethods):

这个类继承自类_BaseMessageMethods,实现若干处理cell消息的方法,针对于消息处理目标是特定的cell情况;

class _BroadcastMessageMethods(_BaseMessageMethods):

这个类继承自类_BaseMessageMethods,实现若干处理cell消息的方法,针对于消息处理目标是所有的cell情况;

class MessageRunner(object):

这个类实现了若干方法,分别对应CellsManager类(管理cell方法API)中的方法,根据不同的消息类型(特定cell、所有cell和响应消息)进行封装,用于从上述的类_ResponseMessageMethods、_TargetedMessageMethods和_BroadcastMessageMethods中调用对应的方法,实现对不同的消息类型的处理。

来看类中具体的方法:

class _BaseMessage(object):

def _append_hop(self):

加入跳信息到routing_path;

def _at_max_hop_count(self,
do_raise=True):


检测本cell是否处于最大的跳数;

def _process_locally(self):

方法确定了我们应该在本cell中执行处理这个消息的方法;

通过类MessageRunner调用适当的方法用来处理消息;

捕获响应或者异常,并把捕获的响应或者异常编码成类Response的实例对象,并返回;

def _setup_response_queue(self):

通过类MessageRunner中的方法_setup_response_queue,建立一个响应队列;

def _cleanup_response_queue(self):

调用类MessageRunner中的方法_cleanup_response_queue来删除一个响应队列;

def _wait_for_json_responses(self,
num_responses=1):


在允许等待的时间内,获取响应列表responses,并返回;

def _send_json_responses(self,
json_responses, neighbor_only=False, fanout=False):


执行发送响应列表到目标cell;

Targeted型的消息只有一个响应,而Broadcast型的消息可能有多个响应;

如果本cell是消息的源,则响应将会从self.process()被返回;

def _send_response(self,
response, neighbor_only=False):


发送执行消息处理方法的结果的响应对象到源cell;

如果本cell就是源cell,则直接获取这个响应对象;

def _send_response_from_exception(self,
exc_info):


从sys.exc_info()返回一个异常,编码成Response类型响应,并发送它;

def _to_dict(self):

转换消息到字典格式;

def to_json(self):

转换消息到JSON格式,用于发送到相邻的cell;

def source_is_us(self):

判断本cell是否建立了这个消息,即是否是这个消息的源;

def process(self):

执行处理消息的方法;

class _TargetedMessage(_BaseMessage):

def _get_next_hop(self):

返回本cell的下一跳(hop),如果下一跳(hop)就是当前的cell,则返回none;

def process(self):

执行处理针对性的消息的方法,并返回执行方法所获取的结果响应到源cell;

如果本cell就是源cell,则还要实现在允许等待的时间内,获取远程执行消息处理方法返回的响应列表;

根据所处理消息的类型不同,这个响应列表中的元素可以是一个也可以是多个;

class _BroadcastMessage(_BaseMessage):

def _get_next_hops(self):

设置下一层次的跳(hops),并返回跳(hops)的数目;

def _send_to_cells(self,
target_cells):


发送信息到多个cell;

def _send_json_responses(self,
json_responses):


发送信息的响应列表;

def process(self):

运行广播消息程序;

class _ResponseMessage(_TargetedMessage):

def process(self):

执行一个响应消息;

class _BaseMessageMethods(base.Base):

def task_log_get_all(self,
message, task_name, period_beginning, period_ending, host, state):


从数据库获取任务日志;

class _ResponseMessageMethods(_BaseMessageMethods):

def parse_responses(self,
message, orig_message, responses):


添加响应到响应队列;

class _TargetedMessageMethods(_BaseMessageMethods):

def schedule_run_instance(self,
message, host_sched_kwargs):


父cell通知本cell来调度新的实例用于建立;

def run_compute_api_method(self,
message, method_info):


运行变量compute_api指定的类中一个指定的方法;

def update_capabilities(self,
message, cell_name, capabilities):


一个子cell通知我们关于它的capabilities值;

def update_capacities(self,
message, cell_name, capacities):


一个子cell通知我们关于它的capacity值;

def announce_capabilities(self,
message):


一个父cell通知本cell发送我们的capabilities值;

所以执行发送capabilities值到父cell的操作,并更新父cell的capabilities值;

def announce_capacities(self,
message):


一个父cell通知本cell发送我们的capacity值;

所以执行发送capabilities值到父cell的操作,并更新父cell的capacity值;

def service_get_by_compute_host(self,
message, host_name):


为计算主机返回服务入口;

根据compute host信息获取相关服务;

def proxy_rpc_to_manager(self,
message, host_name, rpc_message, topic, timeout):


为给定的compute topic代理PRC;

def compute_node_get(self,
message, compute_id):


通过ID值获取相应的计算节点;

def actions_get(self, message,
instance_uuid):


为给定的实例获取所有的实例操作;

def action_get_by_request_id(self,
message, instance_uuid, request_id):


通过request_id和instance_uuid为给定的实例获取操作信息;

def action_events_get(self,
message, action_id):


通过action_id获取事件信息;

def validate_console_port(self,
message, instance_uuid, console_port, console_type):


验证子cell中计算节点的控制台端口;

class _BroadcastMessageMethods(_BaseMessageMethods):

def _at_the_top(self):

确定是否是API级别的cell;

def instance_update_at_top(self,
message, instance, **kwargs):


如果是top级别的cell,则更新数据库中的实例;

def instance_destroy_at_top(self,
message, instance, **kwargs):


如果本cell是一个顶层的cell,则从DB中删除指定的实例;

def instance_delete_everywhere(self,
message, instance, delete_type, **kwargs):


在每一个cell中调用compute API的delete()或者soft_delete()方法;

这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;

所以,我们需要在所有的cell上运行这个方法;

def instance_fault_create_at_top(self,
message, instance_fault, **kwargs):


如果我们是顶层cell,则执行从DB删除一个实例的操作;

def bw_usage_update_at_top(self,
message, bw_update_info, **kwargs):


更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;

def _sync_instance(self,
ctxt, instance):


实例数据的同步;

def sync_instances(self,
message, project_id, updated_since, deleted, **kwargs):


实例数据的同步实现;

def service_get_all(self,
message, filters):


获取message和filters所限制的所有的服务;

def compute_node_get_all(self,
message, hypervisor_match):


返回本cell中的所有计算节点;

def compute_node_stats(self,
message):


从本cell上所有的计算节点获取各个资源参数信息,并求和,即获取本cell上所有计算节点的资源参数分别的总和;

def consoleauth_delete_tokens(self,
message, instance_uuid):


在API cell中为指定的实例删除consoleauth令牌;

class MessageRunner(object):

def _process_message_locally(self,
message):


消息处理进程会调用这个方法,当确定了应该在本cell上处理的时候;(也就是确定了本cell是目标cell)

寻找到基于消息类型的合适方法,并调用它;

如果需要的话,调用者捕获异常,并返回结果到cell;

def _put_response(self,
response_uuid, response):


添加响应到响应队列;

def _setup_response_queue(self,
message):


设置一个eventlet队列,用于存储获取的响应;

响应是被目标cell以_ResponseMessage的形式发送回源cell的;

def _cleanup_response_queue(self,
message):


当正在接受响应或者已经时间超时的时候,停止跟踪响应队列;

def _create_response_message(self,
ctxt, direction, target_cell, response_uuid, response_kwargs, **kwargs):


建立一个ResponseMessage类的对象;

def message_from_json(self,
json_message):


转换一个JSON格式的消息到一个适当的消息实例;

def ask_children_for_capabilities(self,
ctxt):


通知子cell发送它们的capabilities值;

并对父cell的capabilities值相关信息进行更新操作;

这个方法将会在nova-cell服务启动时调用;

def ask_children_for_capacities(self,
ctxt):


通知子cell发送它们的capacities值;

并对父cell的capacities值相关信息进行更新操作;

这个方法将会在nova-cell服务启动时调用;

def tell_parents_our_capabilities(self,
ctxt):


发送本cell的capabilities值到父cell,并对父cell的相关信息进行更新操作;

def tell_parents_our_capacities(self,
ctxt):


发送本cell的capacities值到父cell,并对父cell的相关信息进行更新操作;

def schedule_run_instance(self,
ctxt, target_cell, host_sched_kwargs):


这个方法被调度器所调用,通知子cell来调度一个新的实例用于建立;

def run_compute_api_method(self,
ctxt, cell_name, method_info, call):


在特定的cell中调用一个compute API方法;

运行变量compute_api指定的类中一个指定的方法;

def instance_update_at_top(self,
ctxt, instance):


在top等级的cell上,更新实例;

这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_update_at_top;

def instance_destroy_at_top(self,
ctxt, instance):


在cell树的顶层删除一个实例,这里实际上执行的是/nova/cell/messaging.py中的类_BroadcastMessageMethods下的方法instance_destroy_at_top;

def instance_delete_everywhere(self,
ctxt, instance, delete_type):


在每一个cell中调用compute API的delete()或者soft_delete()方法;

这个方法应用于当我们不知道一个实例属于哪个cell,但是还是需要删除或软删除这个实例的情况;

所以,我们需要在所有的cell上运行这个方法;

def instance_fault_create_at_top(self,
ctxt, instance_fault):


在顶层cell上建立一个实例的断点(???);

def bw_usage_update_at_top(self,
ctxt, bw_update_info):


更新DB中的带宽使用率信息,如果本cell是一个顶层的cell;

def sync_instances(self,
ctxt, project_id, updated_since, deleted):


强制对所有实例实行同步操作;

def service_get_all(self,
ctxt, filters=None):


获取所有的服务;

def service_get_by_compute_host(self,
ctxt, cell_name, host_name):


在当前的cell中为计算主机返回获取的服务入口,主要执行了以下的步骤:

通过host_name获取cell_name的信息;

根据compute host信息获取相关服务;

把获取的服务和计算节点和cell关联起来;

def proxy_rpc_to_manager(self,
ctxt, cell_name, host_name, topic, rpc_message, call, timeout):


为给定的compute topic代理PRC,并获取方法的返回响应;

def task_log_get_all(self,
ctxt, cell_name, task_name, period_beginning, period_ending, host=None, state=None):


从所有的cell或者特定的一个cell的数据库中获取任务日志;

返回响应对象的列表;

def compute_node_get_all(self,
ctxt, hypervisor_match=None):


返回本cell的所有子cell的计算节点列表;

这里是采用广播的方式,获取每一个cell下的计算节点;

而且这里获取的方向是'down',所以实现的就是获取所有子cell的相关计算节点;

def compute_node_stats(self,
ctxt):


采用广播的方式实现每一个cell计算自己的各个资源参数的总和;

而且这里获取的方向是'down',所以实现的就是获取所有子cell的各个资源参数的总和;

def compute_node_get(self,
ctxt, cell_name, compute_id):


在一个特定的cell上,通过ID值获取一个计算节点;

def actions_get(self, ctxt, cell_name,
instance_uuid):

为给定的实例获取所有的实例操作;

def action_get_by_request_id(self,
ctxt, cell_name, instance_uuid, request_id):


通过request_id和instance_uuid为给定的实例获取操作信息;

def action_events_get(self,
ctxt, cell_name, action_id):


在cell_name指定的cell上,通过action id获取相关事件信息;

def consoleauth_delete_tokens(self,
ctxt, instance_uuid):


在API cell中为指定的实例删除consoleauth令牌;

方法在cell树中的执行方向是'up';

def validate_console_port(self,
ctxt, cell_name, instance_uuid, console_port, console_type):


验证子cell中计算节点的控制台端口;

def get_message_types():

获取处理消息的类型,为指定类型消息、广播类型消息或者响应类型消息;

class Response(object):

处理cell返回的响应方法类;

def to_json(self):

def from_json(cls, json_message):

def value_or_raise(self):

/nova/cells/rpc_driver.py

cell RPC通信驱动,通过RPC实现cell的通信;

class CellsRPCDriver(driver.BaseCellsDriver):

这个类继承自类BaseCellsDriver,是通过RPC实现cell间通信的驱动类;

主要用于处理通信中的消费者相关;

def _start_consumer(self,
dispatcher, topic):


启动两个RPC消费者,topic类型和fanout类型;

建立绿色线程,实现处理所有的队列/消费者信息;

def start_consumers(self,
msg_runner):


为通过RPC来处理cell间的通讯,启动两个RPC消费者,topic类型消费者和fanout类型消费者;

def stop_consumers(self):

关闭RPC消费者;

def send_message_to_cell(self,
cell_state, message):


调用类IntercellRPCAPI下的方法send_message_to_cell,来实现发送消息到cell;

class InterCellRPCAPI(rpc_proxy.RpcProxy):

这个类继承自类RpcProxy,主要实现cell间通过RPC实现通信的客户端方法;

def _get_server_params_for_cell(next_hop):

为rpc的调用获取服务的相关参数;

def send_message_to_cell(self,
cell_state, message):


实现发送消息到cell;

class InterCellRPCDispatcher(object):

这个类主要实现了RPC的分发程序类,用来处理从其它cell接收的信息;

def process_message(self,
_ctxt, message):


实现从其它的cell接收消息;

/nova/cells/rpcapi.py

文件中定义了一个类CellsAPI,即Cell RPC API客户端类,用来通过RPC实现对远程cell处理方法的调用;

class CellsAPI(rpc_proxy.RpcProxy):

即Cell RPC API客户端类;

def cast_compute_api_method(self,
ctxt, cell_name, method, *args, **kwargs):


在一个特定的cell中,以cast的方式发送消息,实现远程节点执行指定的compute
API方法;

def call_compute_api_method(self,
ctxt, cell_name, method, *args, **kwargs):


在一个特定的cell中,以call的方式发送消息,实现远程节点执行指定的compute
API方法;

def schedule_run_instance(self,
ctxt, **kwargs):


调度一个新的实例用于建立;

def nstance_update_at_top(self,
ctxt, instance):


在API的级别更新实例信息;

def instance_destroy_at_top(self,
ctxt, instance):


在API这个级别销毁要建立的实例;

def instance_delete_everywhere(self,
ctxt, instance, delete_type):


在每个cell上都运行删除一个指定实例的操作(因为不知道实例位于哪个cell上);

def instance_fault_create_at_top(self,
ctxt, instance_fault):


在cell树的顶层建立一个实例故障;

def bw_usage_update_at_top(self,
ctxt, uuid, mac, start_period, bw_in, bw_out, last_ctr_in, last_ctr_out, last_refreshed=None):


广播信息实现节点带宽使用率的更新;

def instance_info_cache_update_at_top(self,
ctxt, instance_info_cache):


广播通知实例缓存信息的改变;

def get_cell_info_for_neighbors(self,
ctxt):


获取本cell相邻的所有cell的信息;

def sync_instances(self,
ctxt, project_id=None, updated_since=None, deleted=False):


要求所有cell实现实例数据的同步;

def service_get_all(self,
ctxt, filters=None):


def service_get_by_compute_host(self,
ctxt, host_name):


def proxy_rpc_to_manager(self,
ctxt, rpc_message, topic, call=False, timeout=None):


def task_log_get_all(self,
ctxt, task_name, period_beginning, period_ending, host=None, state=None):


def compute_node_get(self,
ctxt, compute_id):


在指定的cell中获取计算节点;

def compute_node_get_all(self,
ctxt, hypervisor_match=None):


获取所有cell中的由hypervisor过滤的计算节点列表;

def compute_node_stats(self,
ctxt):


返回所有cell中计算节点的统计信息;

def actions_get(self, ctxt,
instance):


def action_get_by_request_id(self,
ctxt, instance, request_id):


def action_events_get(self,
ctxt, instance, action_id):


def consoleauth_delete_tokens(self,
ctxt, instance_uuid):


def validate_console_port(self,
ctxt, instance_uuid, console_port, console_type):




/nova/cells/scheduler.py

这个文件中主要实现了类CellsScheduler,即cell调度器实现类;

主要针对的就是在cell中建立虚拟机实例方面的应用;

class CellsScheduler(base.Base):

这个类继承自Base类,即cell调度器实现类;

def _create_instances_here(self,
ctxt, request_spec):


在本cell上建立虚拟机实例;

def _create_action_here(self,
ctxt, instance_uuids):


获取instance_uuids中各个实例的启动信息;

并把这些信息加入到要启动的实例属性中;

def _get_possible_cells(self):

获取所有合适的cell;

def _run_instance(self,
message, host_sched_kwargs):


尝试调度实例;

如果没有合适的cell使用,则引发异常;

def run_instance(self,
message, host_sched_kwargs):


选择一个cell,在它上面我们要建立一个新的实例;

/nova/cells/state.py

这个文件中主要定义了两个类CellState和CellStateManager,主要实现的时对cell一些状态信息进行处理的操作,方法都比较简单,这里就不进行一一解析了;

/nova/cells/utils.py

这个文件中主要定义了一些cell实用的操作方法,方法都比较简单,这里就不进行一一解析了;

到目前为止,nova-cell服务的源码架构进行了简单的分析,下一篇博客中我将以几个例子对nova-cell服务的源码架构和nova-cell服务的具体实现流程进行解析。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: