您的位置:首页 > 其它

Ceilometer项目源码分析----ceilometer报警器服务的实现概览

2014-11-30 23:44 661 查看
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。ceilometer报警器服务的实现概览 ceilometer的报警系统包括比较多的内容,比如报警器状态的评估判定方式有两种,即联合报警器状态评估和单一报警器状态评估,而报警器系统的实现也有两种,即单例报警系统和分布式报警系统。而ceilometer的报警器实现都包含在/ceilometer/alarm中,在这篇博客中,我们先来分析/ceilometer/alarm/service.py,来从整体的高度把握报警系统的内容。在/ceilometer/alarm/service.py中实现了四个类:类AlarmService:报警服务实现的基类;
类SingletonAlarmService:单例报警系统服务;
类PartitionedAlarmService:分布式报警器系统服务;
类AlarmNotifierService:报警器被触发的通知服务实现;
1 类AlarmService:报警服务实现的基类这个类是报警系统实现的基类,来看看其中实现的比较重要的方法;1.1 def_evaluate_assigned_alarms(self)
def _evaluate_assigned_alarms(self):
    """
    获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
    1.获取当前部分的所有报警器;
    2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
      来实现单一报警器模式或者联合报警器模式的评估判定;
    """
    try:
        # 获取所有报警器列表;
        # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
        alarms = self._assigned_alarms()
        LOG.info(_('initiating evaluation cycle on %d alarms') %
                 len(alarms))
            
        # 遍历所有的报警器;
        # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
        for alarm in alarms:
            self._evaluate_alarm(alarm)
    except Exception:
        LOG.exception(_('alarm evaluation cycle failed'))
方法小结:
获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
1.2 def _evaluate_alarm(self, alarm)

def _evaluate_alarm(self, alarm):
    """
    根据alarm的type,将alarm分配给对应的evaluator进行处理;
    单一报警器状态的评估判定;
    联合报警器状态的评估判定;
    """
    # self.supported_evaluators = [threshold,combination]
    if alarm.type not in self.supported_evaluators:
        LOG.debug(_('skipping alarm %s: type unsupported') %
                  alarm.alarm_id)
        return

    LOG.debug(_('evaluating alarm %s') % alarm.alarm_id)
        
    # 两种类型报警器ThresholdEvaluator和CombinationEvaluator的evaluate实现;
    # 单一报警器状态的评估判定;
    # 联合报警器状态的评估判定;
    self.evaluators[alarm.type].obj.evaluate(alarm)


2 类SingletonAlarmService:单例的报警服务
这个类是单例报警系统服务的启动实现,主要实现了单例报警系统服务的启动操作;
2.1 def start(self)

def start(self):
    """
    单例报警器服务SingletonAlarmService的启动操作;
    按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
    获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
    """
    super(SingletonAlarmService, self).start()
    if self.evaluators:
        """
        评估周期60s;
        """
        interval = cfg.CONF.alarm.evaluation_interval
            
        # 按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
        # _evaluate_assigned_alarms:
        # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
        # 针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
        self.tg.add_timer(
            interval,
            self._evaluate_assigned_alarms,
            0)
    # Add a dummy thread to have wait() working
    self.tg.add_timer(604800, lambda: None)
方法小结:单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;

2.2 def _assigned_alarms(self)

def _assigned_alarms(self):
    """
    通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
    """
    return self._client.alarms.list(q=[{'field': 'enabled',
                                        'value': True}])

3 类PartitionedAlarmService:分布式报警器系统服务这个类是分布式报警系统服务的实现,主要实现了分布式报警系统服务的启动和其他操作;
3.1 def start(self)

def start(self):
    """
    分布式报警器系统服务分布式报警器系统服务的启动和运行;
    按照一定的时间间隔周期性的执行以下操作:
    1.实现广播当前partition的存在性的存在性到所有的partition
    (包括uuid和优先级信息);
    2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
      如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
      情况1:所有报警器都要实现重新分配操作;
      情况2:只有新建立的报警器需要实现分配操作;
    3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
      针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
      单一报警器模式或者联合报警器模式的评估判定;
    """
    # 启动PartitionedAlarmService服务;
    super(PartitionedAlarmService, self).start()
    if self.evaluators:
        """
        报警评估周期60s;
        """
        eval_interval = cfg.CONF.alarm.evaluation_interval
            
        """
        self.tg = threadgroup.ThreadGroup(1000)
        按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
        通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
        """
        self.tg.add_timer(
            eval_interval / 4, # 15s
            self.partition_coordinator.report_presence,
            0)
            
        """
        按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
        self.partition_coordinator.check_mastership:
        实现定期检测主控权角色;
        确定当前的partition是否是主控角色;
        如果为拥有主控权的partition,则根据不同情况实现不同形式的报警器分配操作;
        """
        self.tg.add_timer(
            eval_interval / 2, # 30s
            self.partition_coordinator.check_mastership,
            eval_interval,     # 60s
            # _client:构建或重新使用一个经过验证的API客户端;
            *[eval_interval, self._client])
            
        """
        add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
        self._evaluate_assigned_alarms:
        先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
        针对每一个报警器,实现根据报警器类型(threshold和combination),来实现:
        单一报警器状态的评估判定;
        联合报警器状态的评估判定;
        """
        self.tg.add_timer(
            eval_interval,     # 60s
            # 执行报警器的评估操作;
            self._evaluate_assigned_alarms,
            eval_interval)
        
    # Add a dummy thread to have wait() working
    self.tg.add_timer(604800, lambda: None)
方法小结:分布式报警器系统服务分布式报警器系统服务的启动和运行;
按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition
(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;
3.2 def _assigned_alarms(self)

def _assigned_alarms(self):
    """
    通过指定客户端获取分配给当前部分的报警器;
    """
    return self.partition_coordinator.assigned_alarms(self._client)
3.3 def assign(self, context, data)
def assign(self, context, data):
    """
    接收所收到的报警器ID值,如果uuid和本Partition的uuid相匹配,
    则接收报警器ID值,则赋值给assignment;
    用于全部报警器的分配操作;
    """
    self.partition_coordinator.assign(data.get('uuid'),
                                      data.get('alarms'))
3.4 def allocate(self, context, data)
def allocate(self, context, data):
    """
    接收所收到的报警器ID值,如果uuid和本Partition的uuid相匹配,
    则接收报警器ID值,则存储到assignment中;
    用于部分报警器的分配操作;
    """
    self.partition_coordinator.allocate(data.get('uuid'),
                                        data.get('alarms'))
4 PRE { direction: ltr; color: rgb(0, 0, 0); }PRE.western { }PRE.cjk { }PRE.ctl { font-family: "Lohit Devanagari",monospace; }P { margin-bottom: 0.21cm; direction: ltr; color: rgb(0, 0, 0); }P.western { font-family: "Liberation Serif","Times New Roman",serif; font-size: 12pt; }P.cjk { font-family: "AR PL UMing HK"; font-size: 12pt; }P.ctl { font-family: "Lohit Devanagari"; font-size: 12pt; }类AlarmNotifierService:报警器被触发的通知服务实现
这个类所描述的是当报警器被触之后,进行一个报警器报警的通知操作的实现,来看看具体的方法;
4.1 def start(self)

def start(self):
    """
    服务的启动; 
    为RPC通信建立到信息总线的连接;
    建立指定类型的消息消费者;  
    启动协程实现等待并消费处理队列中的消息;
    """
    super(AlarmNotifierService, self).start()
    # Add a dummy thread to have wait() working
    self.tg.add_timer(604800, lambda: None)

4.2 def _handle_action(self, action, alarm_id, previous, current, reason, reason_data)

def _handle_action(self, action, alarm_id, previous,
                       current, reason, reason_data):
    """
    获取系统所采用的消息通信方式;
    通过HTTPS协议POST方法实现发送相关报警器被触发的通知(or)通过日志记录相关报警器被触发的通知;
        
        
    采用两种方式之一实现相关报警器的通知:
    通过HTTPS协议POST方法实现发送相关报警器的通知;
    通过日志记录相关报警器的通知;
    
    # action:要通知的action的URL;
    # alarm_id:被触发的报警器的ID;
    # previous:报警器(触发)之前的状态;
    # current:报警器被触发后转换到的新状态;
    # reason:改变报警器状态的原因;
    """
    try:
        action = network_utils.urlsplit(action)
    except Exception:
        LOG.error(
            _("Unable to parse action %(action)s for alarm %(alarm_id)s"),
            {'action': action, 'alarm_id': alarm_id})
        return

    """
    获取系统所采用的消息通信方式;
    ceilometer.alarm.notifier =
    log = ceilometer.alarm.notifier.log:LogAlarmNotifier
    test = ceilometer.alarm.notifier.test:TestAlarmNotifier
    http = ceilometer.alarm.notifier.rest:RestAlarmNotifier
    https = ceilometer.alarm.notifier.rest:RestAlarmNotifier
    """
    try:
        notifier = self.notifiers[action.scheme].obj
    except KeyError:
        scheme = action.scheme
        LOG.error(
            _("Action %(scheme)s for alarm %(alarm_id)s is unknown, "
              "cannot notify"),
            {'scheme': scheme, 'alarm_id': alarm_id})
        return

    """
    通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),或者是通过日志记录相关报警器的通知;
    """
    try:
        LOG.debug(_("Notifying alarm %(id)s with action %(act)s") % (
                  {'id': alarm_id, 'act': action}))
        notifier.notify(action, alarm_id, previous,
                        current, reason, reason_data)
    except Exception:
        LOG.exception(_("Unable to notify alarm %s"), alarm_id)
        return
方法小结:
获取系统所采用的消息通信方式;
通过HTTPS协议POST方法实现发送相关报警器被触发的通知(or)通过日志记录相关报警器被触发的通知;
4.3 def notify_alarm(self, context, data)

def notify_alarm(self, context, data):
    """
    遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
    获取系统所采用的消息通信方式;
    通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;

    通知内容中包括:
    alarm_id:被触发的报警器的ID;
    previous:报警器(触发)之前的状态;
    current:报警器被触发后转换到的新状态;
    reason:改变报警器状态的原因;
    """
    actions = data.get('actions')
    if not actions:def notify_alarm(self, context, data):
    """
    遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
    获取系统所采用的消息通信方式;
    通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;

    通知内容中包括:
    alarm_id:被触发的报警器的ID;
    previous:报警器(触发)之前的状态;
    current:报警器被触发后转换到的新状态;
    reason:改变报警器状态的原因;
    """
    actions = data.get('actions')
    if not actions:
        LOG.error(_("Unable to notify for an alarm with no action"))
        return

    for action in actions:
        """
        获取系统所采用的消息通信方式;
        通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
        """
        self._handle_action(action,
                            data.get('alarm_id'),
                            data.get('previous'),
                            data.get('current'),
                            data.get('reason'),
                            data.get('reason_data'))
方法小结:遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
获取系统所采用的消息通信方式;
通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐