Kafka系列(22)消费者组消费进度监控都怎么实现?
消费者组消费进度监控如何实现
监控消费的滞后程度,消费Lag 或者Consumer Lag
滞后程度就是指 消费者当前落后于生产者的程度。比方说,Kafka生产者向某个主题成功生产了100万条消息,你的消费者当前消费了80万条消息,那么我们就说你的消费者滞后了20万条消息,即Lag等于20万条。
通常来说,Lag的单位是消息数,而且我们一般是在主题这个级别上讨论Lag的,但实际上,Kafka监控Lag的层次是在分区上,如果要计算主题级别的,你需要手动汇总所有主题分区的Lag,将它们累加起来,合并成最终的Lag值。
由于消费者的速度无法匹配生产者的速度,极有可能导致它消费的数据已经不在操作系统的页缓存中,那么这些数据就会失去Zero Copy 技术的资格,这样的话,消费者不得不从磁盘上读取它们,这就进一步拉大了生产者的差距,从而出现马太效应,即那些Lag原本很大的消费者会越来越慢,Lag会越来越大。
3种方式监控
1使用Kafka自带的命令行工具Kafka-consumer-groups脚本
2使用Kafka Java Consumer Api 编程
3使用Kafka 自带的 jmx 监控指标
Kafka 自带命令
独立消费者是没有使用消费者组机制的消费者程序。和消费者组相同的是,他们也要配置Group.id参数值,但和消费者组调用KafkaConsumer.subscribe不同的是,独立消费者调用KafkaConsumer.assign方法直接指定分区
使用以下命令
bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker 连接信息 > --describe --group <group 名称 >
输出信息很丰富,按照消费者组订阅主题的分区进行展示,每一个分区一行数据;其次,除了主题、分区等信息外,它会汇报每个分区当前最新生产的消息的位移值(Log-end-offset),该消费者组当前最新的消费消息的位移值(current-offset),lag值,消费者实例,消费者连接Broker的主机名以及消费者的client-id的信息。
出现以上特殊情况,说明消费者程序未启动
还有不返回任何结果,说明Kafka版本太老旧
Kafka Java Consumer API
提供了查询当前分区最新消费位移和消费者组最新消费消息位移两组方法。
public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);//(1)
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());//(2)
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));//(3)
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理 ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}
第一处 是调用AdminClietnConsumerGroupOffsets方法获取给定消费者组的最新消费消息的位移
第二处是获取订阅分区的最新消费位移
第三处 做相应的减法操作,获取Lag值并封装进一个Map对象
这段代码 只适用于2.0.0版本以上
Kafka JMX 监控指标
Zabbix/Grafana
相关的属性
records-lag-max records-lead-min
分别表示次消费者在测试窗口时间内曾经达到的最大的Lag值和最小的Lead值
Lead值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。 Lag越大的话,Lead就越小;
Kafka的消息是有留存时间设置的,默认是一周,也就是说Kafka默认删除1周前的数据,倘若你的消费者程序足够慢,漫道它要消费的数据快被Kafka删除了。这时你就必须立即处理,否则一定会出现消息被删除,从而导致消费者程序重新调整位移值的情形。这可能产生两个后果:一个是消费者从头消费一遍数据,另一个是消费者从最新的消费位移开始消费。之前没来得及消费的消息全部被跳过,从而造成消息丢失
从使用便捷性上来看,第一种方法最简单,直接运行Kafka自带的命令行工具‘
第二种使用Consumer Api组合计算Lag 也是一种有效的方法 重要的是它能集成很多企业自动化的监控工具
第三种 继承性最好,直接将jmx 监控指标配置到主流的监控框架即可
- flume实现监控文件,并将文件内容传入kafka的,kafka在控制台实现消费
- Zabbix监控之从zookeeper中获取Kafka消费进度和lag
- Zabbix监控之从Kafka中获取消费进度和lag
- apache kafka监控系列-KafkaOffsetMonitor
- Kafka学习之怎么保证不丢,不重复消费数据
- Linux qtcreator下kafka之librdkafka库的C++语言封装,实现生产和消费(★firecat推荐★)
- Kafka集群监控软件系列文章
- Spark Streaming消费Kafka Direct保存offset到Redis,实现数据零丢失和exactly once
- apache kafka监控系列-kafka-web-console
- kafka学习之监听方式实现消费者
- ASP.NET MVC+EF框架+EasyUI实现权限管理系列(22)-为用户设置角色
- apache kafka监控系列-KafkaOffsetMonitor
- java 实现kafka消息生产者和消费者
- 【实时计算架构系列1】WePay如何基于谷歌云平台(GCP)和kafka实现实时流式欺诈检测
- Spark kafka实时消费实现
- java 多线程并发系列之 生产者消费者模式的两种实现
- js写圆形动态进度条圆形指示器,动画怎么实现,进度条变色,radialIndicator.js的使用说明用法
- Kafka详解六:Kafka如何通过源码实现监控
- WPF技术触屏上的应用系列(三): 视频播放器的使用及视频播放、播放、暂停、可拖动播放进度效果实现
- 安卓开发-进度条上方显示各个进度的视频片段,如何实现 安卓开发问题,请问这种进度条上弹框显示各个时间段视频片段是怎么实现的?![图片](http://img.ask.csdn.net/upload/2