您的位置:首页 > 其它

KafkaConsumer使用详解

2017-07-13 18:09 633 查看
参考:http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

自动提交offset

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // 设置连接kafka集群的seed broker,不需要传入所有的broker
props.put("group.id", "test"); // 设置group id
props.put("enable.auto.commit", "true"); // 打开自动提交offset
props.put("auto.commit.interval.ms", "1000"); // 设置自动提交offset的时间间隔,示例为每1秒提交一次
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化类

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 创建消费者客户端
consumer.subscribe(Arrays.asList("foo", "bar")); // 订阅topic:foo和bar

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); // 拉取数据,等待时长为100毫秒
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}


手动提交offset

粗粒度提交

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false"); // 关闭自动提交offset
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));

final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 同步提交offset。这里提交的offset是下一次poll结果的第一个record的offset,即这次poll结果的最后一个record的offset+1
consumer.commitSync();
buffer.clear();
}
}


细粒度提交,可以指定offset提交

try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
// 获取最后一个record的offset
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 同步提交offset。
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}


指定分区消费

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1)); // 只消费foo的分区0和分区1,不可以与subscribe方法同用


获取最早offset或者最新offset

TopicPartition partition = new TopicPartition("foo", 0);
// 获取foo的分区0的最早offset
long beginningOffset = consumer.beginningOffsets(Collections.singletonList(partition)).get(partition);
// 获取foo的分区0的最新offset
long endOffset = consumer.endOffsets(Collections.singletonList(partition)).get(partition);


从某一个offset开始消费

TopicPartition partition = new TopicPartition("foo", 0);
long offset = args[0];

// 移动到指定offset
consumer.seek(partition, offset);
// 移动到最新offset
consumer.seekToEnd(Collections.singletonList(partition));
// 移动到最早offset
consumer.seekToBeginning(Collections.singletonList(partition));
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: