java 获取kafka offsets(偏移量)
2017-08-27 14:08
302 查看
public KafkaInfoClient(){ init(); } public Map<Integer,Long> getEarliestOffset(String topic) { //kafka.api.OffsetRequest.EarliestTime() = -2 return getTopicOffset(topic,kafka.api.OffsetRequest.EarliestTime()); } /*** * 获取指定 topic 的所有分区 offset * @param topic * @param whichTime 要获取offset的时间,-1 最新,-2 最早 * @return */ public Map<Integer,Long> getTopicOffset(String topic, long whichTime) { HashMap<Integer, Long> offsets = new HashMap<Integer, Long>(); TreeMap<Integer, PartitionMetadata> leaders = this.findLeader(hosts, topic); for( int part:leaders.keySet()){ PartitionMetadata metadata = leaders.get(part); String leadBroker = metadata.leader().host(); int leadPort = metadata.leader().port(); SimpleConsumer consumer = new SimpleConsumer(leadBroker, leadPort, timeOut, bufferSize, clientID); long partitionOffset = this.getPartitionOffset(consumer, topic, part, whichTime); offsets.put(part,partitionOffset); } return offsets; } /*** * 获取 offset * @param consumer SimpleConsumer * @param topic topic * @param partition partition * @param whichTime 要获取offset的时间,-1 最新,-2 最早 * @return */ private long getPartitionOffset(SimpleConsumer consumer, String topic, int partition, long whichTime) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); //PartitionOffsetRequestInfo(long time, int maxNumOffsets) 中的第二个参数maxNumOffsets,没弄明白是什么意思,但是测试后发现传入1 时返回whichTime 对应的offset,传入2 返回一个包含最大和最小offset的元组 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, OffsetRequest.CurrentVersion(), consumer.clientId()); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { logger.error("Error fetching data Offset Data the Broker. Reason:{}", response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /*** * 获取每个 partition 元数据信息 * @param bootstraps (host,port) * @param topic topic * @return */ private TreeMap<Integer, PartitionMetadata> findLeader(Map<String, Integer> bootstraps, String topic) { TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>(); loop: for (Map.Entry<String, Integer> bootstrap : bootstraps.entrySet()) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(bootstrap.getKey(), bootstrap.getValue(), timeOut, bufferSize, clientID); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Exception e) { logger.error("Error communicating with Broker [{}] to find Leader for [{}] Reason: ",bootstrap,topic,e); } finally { if (consumer != null) ``` consumer.close(); } } return map; }
相关文章推荐
- Java curator操作zookeeper获取kafka
- springboot框架中使用java操作kafka获取数据
- java api如何获取kafka所有Topic列表,并放置为一个list
- kafka使用getOffsetsBefore()获取获取offset异常分析
- kafka java消费者获取不到数据
- 用java代码手动控制kafkaconsumer偏移量
- java获取kafka中的消息数据
- java获取kafka consumer lag
- Kafka Java API获取非compacted topic总消息数
- 获取准确的操作系统名称的Java工具类
- 获取java byte的无符号数值
- 在java中快速获取环境信息
- java 获取进程的processId
- java Java Calendar 获取 上周日 上周六 时间运算
- java获取日期时间
- java.lang.IllegalArgumentException: cannot index term vector offsets
- java 获取各种常用时间的方法
- java调用shell命令并获取执行结果
- java获取 ip mac 地址
- java远程获取图片生成base64串