您的位置:首页 > 运维架构

Kafka系列(22)消费者组消费进度监控都怎么实现?

2019-07-28 11:00 621 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_18522601/article/details/97611812

消费者组消费进度监控如何实现

监控消费的滞后程度,消费Lag 或者Consumer Lag

滞后程度就是指 消费者当前落后于生产者的程度。比方说,Kafka生产者向某个主题成功生产了100万条消息,你的消费者当前消费了80万条消息,那么我们就说你的消费者滞后了20万条消息,即Lag等于20万条。

通常来说,Lag的单位是消息数,而且我们一般是在主题这个级别上讨论Lag的,但实际上,Kafka监控Lag的层次是在分区上,如果要计算主题级别的,你需要手动汇总所有主题分区的Lag,将它们累加起来,合并成最终的Lag值。

由于消费者的速度无法匹配生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中,那么这些数据就会失去Zero Copy 技术的资格,这样的话,消费者不得不从磁盘上读取它们,这就进一步拉大了生产者的差距,从而出现马太效应,即那些Lag原本很大的消费者会越来越慢,Lag会越来越大。

3种方式监控

   1使用Kafka自带的命令行工具Kafka-consumer-groups脚本

   2使用Kafka Java Consumer Api 编程

   3使用Kafka 自带的 jmx 监控指标

Kafka 自带命令

     独立消费者是没有使用消费者组机制的消费者程序。和消费者组相同的是,他们也要配置Group.id参数值,但和消费者组调用KafkaConsumer.subscribe不同的是,独立消费者调用KafkaConsumer.assign方法直接指定分区

  使用以下命令

   bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >

   输出信息很丰富,按照消费者组订阅主题的分区进行展示,每一个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(Log-end-offset),该消费者组当前最新的消费消息的位移值(current-offset),lag值,消费者实例,消费者连接Broker的主机名以及消费者的client-id的信息。

出现以上特殊情况,说明消费者程序未启动

还有不返回任何结果,说明Kafka版本太老旧

 

Kafka Java Consumer API 

   提供了查询当前分区最新消费位移和消费者组最新消费消息位移两组方法。

  public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
        Properties props = new Properties();
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        try (AdminClient client = AdminClient.create(props)) {
            ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);//(1)
            try {
                Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                    Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());//(2)
                    return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                            entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));//(3)
                }

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // 处理中断异常
                // ...
                return Collections.emptyMap();
            } catch (ExecutionException e) {
                // 处理 ExecutionException
                // ...
                return Collections.emptyMap();
            } catch (TimeoutException e) {
                throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
            }
        }
    }

第一处 是调用AdminClietnConsumerGroupOffsets方法获取给定消费者组的最新消费消息的位移

第二处是获取订阅分区的最新消费位移

第三处 做相应的减法操作,获取Lag值并封装进一个Map对象

这段代码 只适用于2.0.0版本以上 

 

Kafka JMX 监控指标

 Zabbix/Grafana 

    相关的属性

   records-lag-max  records-lead-min

   分别表示次消费者在测试窗口时间内曾经达到的最大的Lag值和最小的Lead值

  Lead值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。 Lag越大的话,Lead就越小;

  Kafka的消息是有留存时间设置的,默认是一周,也就是说Kafka默认删除1周前的数据,倘若你的消费者程序足够慢,漫道它要消费的数据快被Kafka删除了。这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消费位移开始消费。之前没来得及消费的消息全部被跳过,从而造成消息丢失

 

 

从使用便捷性上来看,第一种方法最简单,直接运行Kafka自带的命令行工具‘

第二种使用Consumer Api组合计算Lag 也是一种有效的方法 重要的是它能集成很多企业自动化的监控工具

第三种 继承性最好,直接将jmx 监控指标配置到主流的监控框架即可

 

 

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐