Kafka Java API获取非compacted topic总消息数
2018-08-24 15:18
295 查看
目前Kafka并没有提供直接的工具来帮助我们获取某个topic的当前总消息数,需要我们自行写程序来实现。下列代码可以实现这一功能,特此记录一下:
/** * 获取某个topic的当前消息数 * Java 8+ only * * @param topic * @param brokerList * @return */ public static long totalMessageCount(String topic, String brokerList) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic)) .orElse(Collections.emptyList()) .stream() .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toList()); Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum(); } }
相关文章推荐
- java微信开发API解析(二)-获取消息和回复消息
- java获取kafka中的消息数据
- java微信开发API解析(二)-获取消息和回复消息
- java微信开发API解析(二)-获取消息和回复消息
- java微信开发API第二步 获取和回复消息
- java api如何获取kafka所有Topic列表,并放置为一个list
- kafka多线程消费和手动提交offset,新版java api
- JAVA通过API获取天气预报信息(HTTP协议)
- java后端系统架构之消息队列篇:kafka的实验
- springboot框架中使用java操作kafka获取数据
- java根据ip地质调用百度定位api服务获取地理位置示例
- Kafka Java API操作topic
- Kafka Java API 之Producer源码解析
- java服务器端调用JPush极光推送api推送通知消息
- java微信开发API解析(三)-高级功能的前奏----获取以及保存接口调用凭证
- java调用百度定位api服务获取地理位置示例
- 如何获取错误消息说明使用 FormatMessage API
- 微信支付java版本之JSAPI支付+发送模板消息
- 分布式消息队列kafka系列介绍 — 核心API介绍及实例
- Kafka消费者JavaAPI Kafka生产者JavaAPI