Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
2014-12-09 18:37
916 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cnPS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
ceilometer-alarm-evaluator服务的初始化和启动
本篇博客将解析服务组件ceilometer-alarm-evaluator的初始化和启动操作,这个服务组件即ceilometer报警服务;来看方法/ceilometer/cli.py----def alarm_evaluator,这个方法即实现了ceilometer-alarm-evaluator服务的初始化和启动操作。
1.报警服务系统:
SingletonAlarmService和PartitionedAlarmService;
2.默认报警服务:
ceilometer.alarm.service.SingletonAlarmService;
3.如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
1 单例报警服务SingletonAlarmService的初始化和启动操作1.1 SingletonAlarmService类初始化操作 类SingletonAlarmService的初始化操作主要完成了两部分内容: * 加载命名空间ceilometer.alarm.evaluator中的所有插件; ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator 即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估; * 建立线程池,用于后续报警器服务中若干操作的运行;
class SingletonAlarmService----def __init__
class Service----def __init__
class AlarmService----def _load_evaluators
1.2 SingletonAlarmService类启动操作
类的启动操作实现了单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms,方法self._evaluate_assigned_alarms实现获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
class SingletonAlarmService----def start
class AlarmService----def _evaluate_assigned_alarms 这个方法实现获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
2 分布式报警服务PartitionedAlarmService的初始化和启动操作[b]2.1 [b]PartitionedAlarmService类初始化操作[/b][/b] PartitionedAlarmService类初始化操作和SingletonAlarmService类初始化操作内容大致是相同的,同样主要完成了两部分内容: * 加载命名空间ceilometer.alarm.evaluator中的所有插件; ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator 即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估; * 建立线程池,用于后续报警器服务中若干操作的运行; * 初始化分布式报警协议实现类PartitionCoordinator;
class PartitionedAlarmService----def __init__
class Service----def __init__
class AlarmService----def _load_evaluators
[b]2.2 [b]PartitionedAlarmService类启动操作[/b][/b]分布式报警器系统服务分布式报警器系统服务的启动和运行,按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;class PartitionedAlarmService----def start
class Service(service.Service)----def start
因为这里实现的是分布式报警系统,涉及到节点服务间的消息通信,所以这里应用RPC消息队列实现服务间的通信,进行相关的初始化操作;
class PartitionedAlarmService----def initialize_service_hook
附录:python的多重父类继承 在上面的分析中,我们看到在服务初始化和启动的过程中,若干方法都是多重父类继承,这里需要注意的是父类方法的搜索顺序;实际上python经典类的父类方法搜索顺序是深度优先,而python新式类的父类方法搜索顺序是广度优先; 我写了一个小小的测试程序,来看新式类的父类方法搜索顺序:
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cnPS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
ceilometer-alarm-evaluator服务的初始化和启动
本篇博客将解析服务组件ceilometer-alarm-evaluator的初始化和启动操作,这个服务组件即ceilometer报警服务;来看方法/ceilometer/cli.py----def alarm_evaluator,这个方法即实现了ceilometer-alarm-evaluator服务的初始化和启动操作。
def alarm_evaluator(): """ 加载并启动SingletonAlarmService服务(单例报警服务); 报警服务系统:SingletonAlarmService和PartitionedAlarmService; 默认报警服务:ceilometer.alarm.service.SingletonAlarmService; 如果要应用分布式报警系统,则需要在这里修改配置文件中的参数; """ service.prepare_service() """ launch:加载并启动SingletonAlarmService服务,最终调用了服务的start方法实现服务的启动; 配置文件定义了默认报警服务:ceilometer.alarm.service.SingletonAlarmService SingletonAlarmService:单例的报警服务; """ eval_service = importutils.import_object(cfg.CONF.alarm.evaluation_service) os_service.launch(eval_service).wait()方法小结:
1.报警服务系统:
SingletonAlarmService和PartitionedAlarmService;
2.默认报警服务:
ceilometer.alarm.service.SingletonAlarmService;
3.如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
1 单例报警服务SingletonAlarmService的初始化和启动操作1.1 SingletonAlarmService类初始化操作 类SingletonAlarmService的初始化操作主要完成了两部分内容: * 加载命名空间ceilometer.alarm.evaluator中的所有插件; ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator 即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估; * 建立线程池,用于后续报警器服务中若干操作的运行;
class SingletonAlarmService----def __init__
class SingletonAlarmService(AlarmService, os_service.Service): def __init__(self): super(SingletonAlarmService, self).__init__() """ _load_evaluators: 这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件; namespace = ceilometer.alarm.evaluator ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator """ self._load_evaluators() self.api_client = None
class Service----def __init__
class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) # signal that the service is done shutting itself down: self._done = event.Event()
class AlarmService----def _load_evaluators
class AlarmService(object): EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator" def _load_evaluators(self): """ 这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件; namespace = ceilometer.alarm.evaluator ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator """ self.evaluators = extension.ExtensionManager( namespace=self.EXTENSIONS_NAMESPACE, invoke_on_load=True, invoke_args=(rpc_alarm.RPCAlarmNotifier(),) ) # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件; self.supported_evaluators = [ext.name for ext in self.evaluators.extensions]
1.2 SingletonAlarmService类启动操作
类的启动操作实现了单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms,方法self._evaluate_assigned_alarms实现获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
class SingletonAlarmService----def start
def start(self): """ 单例报警器服务SingletonAlarmService的启动操作; 按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms; 获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定; """ super(SingletonAlarmService, self).start() if self.evaluators: """ 评估周期60s; """ interval = cfg.CONF.alarm.evaluation_interval # add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms; # _evaluate_assigned_alarms: # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法; # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现: # 单一报警器状态的评估判定; # 联合报警器状态的评估判定; self.tg.add_timer( interval, self._evaluate_assigned_alarms, 0) # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None)
class AlarmService----def _evaluate_assigned_alarms 这个方法实现获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
def _evaluate_assigned_alarms(self): """ 获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定; 1.获取当前部分的所有报警器; 2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination), 来实现单一报警器模式或者联合报警器模式的评估判定; """ try: # 获取所有报警器列表; # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器; alarms = self._assigned_alarms() LOG.info(_('initiating evaluation cycle on %d alarms') % len(alarms)) # 遍历所有的报警器; # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现: # 单一报警器状态的评估判定; # 联合报警器状态的评估判定; for alarm in alarms: self._evaluate_alarm(alarm) except Exception: LOG.exception(_('alarm evaluation cycle failed'))
2 分布式报警服务PartitionedAlarmService的初始化和启动操作[b]2.1 [b]PartitionedAlarmService类初始化操作[/b][/b] PartitionedAlarmService类初始化操作和SingletonAlarmService类初始化操作内容大致是相同的,同样主要完成了两部分内容: * 加载命名空间ceilometer.alarm.evaluator中的所有插件; ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator 即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估; * 建立线程池,用于后续报警器服务中若干操作的运行; * 初始化分布式报警协议实现类PartitionCoordinator;
class PartitionedAlarmService----def __init__
class PartitionedAlarmService(AlarmService, rpc_service.Service): """ 分布式报警器系统服务; """ def __init__(self): super(PartitionedAlarmService, self).__init__( cfg.CONF.host, cfg.CONF.alarm.partition_rpc_topic, self ) """ 加载命名空间ceilometer.alarm.evaluator中的所有插件; namespace = ceilometer.alarm.evaluator ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator """ self._load_evaluators() self.api_client = None self.partition_coordinator = coordination.PartitionCoordinator()
class Service----def __init__
class Service(object): def __init__(self, threads=1000): self.tg = threadgroup.ThreadGroup(threads) # signal that the service is done shutting itself down: self._done = event.Event()
class AlarmService----def _load_evaluators
class AlarmService(object): EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator" def _load_evaluators(self): """ 这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件; namespace = ceilometer.alarm.evaluator ceilometer.alarm.evaluator = threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator """ self.evaluators = extension.ExtensionManager( namespace=self.EXTENSIONS_NAMESPACE, invoke_on_load=True, invoke_args=(rpc_alarm.RPCAlarmNotifier(),) ) # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件; self.supported_evaluators = [ext.name for ext in self.evaluators.extensions]
[b]2.2 [b]PartitionedAlarmService类启动操作[/b][/b]分布式报警器系统服务分布式报警器系统服务的启动和运行,按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;class PartitionedAlarmService----def start
def start(self): """ 分布式报警器系统服务分布式报警器系统服务的启动和运行; 按照一定的时间间隔周期性的执行以下操作: 1.实现广播当前partition的存在性的存在性到所有的partition (包括uuid和优先级信息); 2.实现定期检测主控权角色;确定当前的partition是否是主控角色; 如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作; 情况1:所有报警器都要实现重新分配操作; 情况2:只有新建立的报警器需要实现分配操作; 3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法; 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现: 单一报警器模式或者联合报警器模式的评估判定; """ # 启动PartitionedAlarmService服务; super(PartitionedAlarmService, self).start() if self.evaluators: """ 报警评估周期60s; """ eval_interval = cfg.CONF.alarm.evaluation_interval """ self.tg = threadgroup.ThreadGroup(1000) 按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence; 通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息); """ self.tg.add_timer( eval_interval / 4, # 15s self.partition_coordinator.report_presence, 0) """ 按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership; self.partition_coordinator.check_mastership: 实现定期检测主控权角色; 确定当前的partition是否是主控角色; 如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作; """ self.tg.add_timer( eval_interval / 2, # 30s self.partition_coordinator.check_mastership, eval_interval, # 60s # _client:构建或重新使用一个经过验证的API客户端; *[eval_interval, self._client]) """ add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms; self._evaluate_assigned_alarms: 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法; 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现: 单一报警器状态的评估判定; 联合报警器状态的评估判定; """ self.tg.add_timer( eval_interval, # 60s # 执行报警器的评估操作; self._evaluate_assigned_alarms, eval_interval) # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None)
class Service(service.Service)----def start
因为这里实现的是分布式报警系统,涉及到节点服务间的消息通信,所以这里应用RPC消息队列实现服务间的通信,进行相关的初始化操作;
class Service(service.Service): def start(self): """ 为RPC通信建立到信息总线的连接; 1.建立指定类型的消息消费者; 2.执行方法initialize_service_hook; 3.启动协程实现等待并消费处理队列中的消息; """ super(Service, self).start() """ 为RPC通信建立到信息总线的连接; 建立一个新的连接,或者从连接池中获取一个; """ self.conn = rpc.create_connection(new=True) LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) """ RpcDispatcher:RPC消息调度类; """ dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], self.serializer) # Share this same connection for these Consumers """ create_consumer:建立指定类型的消息消费者(fanout or topic); 1.创建以服务的topic为路由键的消费者; 2.创建以服务的topic和本机名为路由键的消费者 (基于topic&host,可用来接收定向消息); 3.fanout直接投递消息,不进行匹配,速度最快 (fanout类型,可用于接收广播消息); """ self.conn.create_consumer(self.topic, dispatcher, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) self.conn.create_consumer(node_topic, dispatcher, fanout=False) self.conn.create_consumer(self.topic, dispatcher, fanout=True) # Hook to allow the manager to do other initializations after # the rpc connection is created. """ 在消息消费进程启动前,必须先声明消费者; 建立一个'topic'类型的消息消费者; 根据消费者类(TopicConsumer)和消息队列名称 (pool_name: ceilometer.collector.metering) 以及指定主题topic(metering)建立消息消费者,并加入消费者列表; """ if callable(getattr(self.manager, 'initialize_service_hook', None)): self.manager.initialize_service_hook(self) """ 启动消费者线程; consume_in_thread用evelent.spawn创建一个协程一直运行; 等待消息,在有消费到来时会创建新的协程运行远程调用的函数; 启动协程实现等待并消费处理队列中的消息; """ self.conn.consume_in_thread()
class PartitionedAlarmService----def initialize_service_hook
class PartitionedAlarmService(AlarmService, rpc_service.Service): def initialize_service_hook(self, service): LOG.debug(_('initialize_service_hooks')) # create_worker:建立一个'topic'类型的消息消费者; # 指定主题topic(alarm_partition_coordination)和队列名称pool_name(ceilometer.alarm.alarm_partition_coordination); self.conn.create_worker( # alarm_partition_coordination cfg.CONF.alarm.partition_rpc_topic, rpc_dispatcher.RpcDispatcher([self]), # ceilometer.alarm.alarm_partition_coordination 'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic, )至此,ceilometer-alarm-evaluator服务的初始化和启动操作分析完成。
附录:python的多重父类继承 在上面的分析中,我们看到在服务初始化和启动的过程中,若干方法都是多重父类继承,这里需要注意的是父类方法的搜索顺序;实际上python经典类的父类方法搜索顺序是深度优先,而python新式类的父类方法搜索顺序是广度优先; 我写了一个小小的测试程序,来看新式类的父类方法搜索顺序:
class A(object): def start(self): print 'A-start' class B0(A): def start(self): super(B0,self).start() print 'B0-start' class B1(A): def start(self): super(B1,self).start() print 'B1-start' class C(B0,B1): def start(self): super(C,self).start() print 'C-start' TEST = C() TEST.start() 输出为: A-start B1-start B0-start C-start
相关文章推荐
- Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
- Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
- Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
- Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
- Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
- ceilometer-alarm-notifier/evaluator服务的初始化和启动
- Ceilometer项目源码分析----ceilometer报警器服务的实现概览
- Nginx源码分析-启动初始化过程(一)
- linux-3.2.36内核启动4-setup_arch中的内存初始化3(arm平台 bootmem_init源码分析)
- OpenStack Swift源码分析(3)----swift服务启动源码分析之三
- Hadoop源码分析之IPC中Server端的初始化与启动
- [android源码分析]bluez启动过程中的各种plugin的初始化(一)--__bluetooth_builtin数组所耍的花样
- cinder服务启动源码分析
- linux-3.2.36内核启动4-setup_arch中的内存初始化3(arm平台 bootmem_init源码分析)
- OpenStack Swift源码分析(2)----swift服务启动源码分析之二
- Nginx源码分析-启动初始化过程(二)
- ActivityManagerService服务线程启动源码分析
- Android输入管理服务启动过程源码分析
- 【原创】OpenStack Swift源码分析(三)proxy服务启动