您的位置:首页 > 编程语言 > Java开发

java 获取kafka offsets(偏移量)

2017-08-27 14:08 302 查看
public KafkaInfoClient(){
init();
}

public Map<Integer,Long> getEarliestOffset(String topic) {
//kafka.api.OffsetRequest.EarliestTime() = -2
return getTopicOffset(topic,kafka.api.OffsetRequest.EarliestTime());
}

/***
* 获取指定 topic 的所有分区 offset
* @param topic
* @param whichTime   要获取offset的时间,-1 最新,-2 最早
* @return
*/

public Map<Integer,Long> getTopicOffset(String topic, long whichTime) {
HashMap<Integer, Long> offsets = new HashMap<Integer, Long>();
TreeMap<Integer, PartitionMetadata> leaders = this.findLeader(hosts, topic);
for( int part:leaders.keySet()){
PartitionMetadata metadata = leaders.get(part);
String leadBroker = metadata.leader().host();
int leadPort = metadata.leader().port();
SimpleConsumer consumer = new SimpleConsumer(leadBroker, leadPort, timeOut, bufferSize, clientID);
long partitionOffset = this.getPartitionOffset(consumer, topic, part, whichTime);
offsets.put(part,partitionOffset);
}
return offsets;
}
/***
* 获取 offset
* @param consumer SimpleConsumer
* @param topic topic
* @param partition partition
* @param whichTime 要获取offset的时间,-1 最新,-2 最早
* @return
*/
private long getPartitionOffset(SimpleConsumer consumer, String topic, int partition, long whichTime) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
//PartitionOffsetRequestInfo(long time, int maxNumOffsets) 中的第二个参数maxNumOffsets,没弄明白是什么意思,但是测试后发现传入1 时返回whichTime 对应的offset,传入2 返回一个包含最大和最小offset的元组
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.error("Error fetching data Offset Data the Broker. Reason:{}", response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}

/***
* 获取每个 partition 元数据信息
* @param bootstraps (host,port)
* @param topic topic
* @return
*/
private TreeMap<Integer, PartitionMetadata> findLeader(Map<String, Integer> bootstraps, String topic) {
TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
loop:
for (Map.Entry<String, Integer> bootstrap : bootstraps.entrySet()) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(bootstrap.getKey(), bootstrap.getValue(), timeOut,
bufferSize, clientID);
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
} catch (Exception e) {
logger.error("Error communicating with Broker [{}] to find Leader for [{}] Reason: ",bootstrap,topic,e);
} finally {
if (consumer != null)
```
consumer.close();
}
}
return map;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: