KafkaConsumer使用详解
2017-07-13 18:09
633 查看
参考:http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
细粒度提交,可以指定offset提交
自动提交offset
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // 设置连接kafka集群的seed broker,不需要传入所有的broker props.put("group.id", "test"); // 设置group id props.put("enable.auto.commit", "true"); // 打开自动提交offset props.put("auto.commit.interval.ms", "1000"); // 设置自动提交offset的时间间隔,示例为每1秒提交一次 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key的反序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化类 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 创建消费者客户端 consumer.subscribe(Arrays.asList("foo", "bar")); // 订阅topic:foo和bar while (true) { ConsumerRecords<String, String> records = consumer.poll(100); // 拉取数据,等待时长为100毫秒 for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
手动提交offset
粗粒度提交Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); // 关闭自动提交offset props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 同步提交offset。这里提交的offset是下一次poll结果的第一个record的offset,即这次poll结果的最后一个record的offset+1 consumer.commitSync(); buffer.clear(); } }
细粒度提交,可以指定offset提交
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } // 获取最后一个record的offset long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 同步提交offset。 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
指定分区消费
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); // 只消费foo的分区0和分区1,不可以与subscribe方法同用
获取最早offset或者最新offset
TopicPartition partition = new TopicPartition("foo", 0); // 获取foo的分区0的最早offset long beginningOffset = consumer.beginningOffsets(Collections.singletonList(partition)).get(partition); // 获取foo的分区0的最新offset long endOffset = consumer.endOffsets(Collections.singletonList(partition)).get(partition);
从某一个offset开始消费
TopicPartition partition = new TopicPartition("foo", 0); long offset = args[0]; // 移动到指定offset consumer.seek(partition, offset); // 移动到最新offset consumer.seekToEnd(Collections.singletonList(partition)); // 移动到最早offset consumer.seekToBeginning(Collections.singletonList(partition));
相关文章推荐
- 使用kafka consumer api时,中文乱码问题
- 使用kafka consumer high-level API开发的一些问题
- 关于Kafka 的 consumer 消费者手动提交详解
- Kafka的Producer和Consumer的示例(使用java语言)
- kafka Consumer详解
- 本地Consumer和Producer无法使用远程Kafka服务器的处理办法
- kafka HighLevelConsumer API 使用案例
- Kafka详解五:Kafka Consumer的底层API- SimpleConsumer
- Kafka配置文件详解之:consumer.properties
- Kafka Consumer API 的使用
- kafka配置文件详解:consumer.properties
- 本地Consumer和Producer无法使用远程Kafka服务器的处理办法
- 使用nodejs对kafka、zookeeper数据进行消费consumer
- Kafka详解五、Kafka Consumer的底层API- SimpleConsumer
- Kafka详解五、Kafka Consumer的底层API- SimpleConsumer
- Kafka的Producer和Consumer的示例(使用java语言)
- 关于Kafka 的 consumer 消费者手动提交详解
- 详解使用docker搭建kafka环境
- 使用命令读取kafka的内部topic:__consumer_offsets
- Kafka学习笔记——Kafka原理与使用详解