kafka 消费者offset记录位置和方式
2017-12-07 19:28
351 查看
4000
我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同。
首先来说说消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector】,我们会配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:
如果是根据kafka默认的api来消费,即【org.apache.kafka.clients.consumer.KafkaConsumer】,我们会配置参数【bootstrap.servers】来消费。而其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面,查看当前group的消费进度,则要依靠kafka自带的工具【kafka-consumer-offset-checker】,例如:
offset更新的方式,不区分是用的哪种api,大致分为两类:
自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。
文章出处:http://www.mamicode.com/info-detail-1969443.html
我们大家都知道,kafka消费者在会保存其消费的进度,也就是offset,存储的位置根据选用的kafka api不同而不同。
首先来说说消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector】,我们会配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:
[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0 5662 cZxid = 0x20006d28a ctime = Wed Apr 12 18:20:51 CST 2017 mZxid = 0x30132b0ed mtime = Tue Aug 22 18:53:22 CST 2017 pZxid = 0x20006d28a cversion = 0 dataVersion = 5758 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 0
如果是根据kafka默认的api来消费,即【org.apache.kafka.clients.consumer.KafkaConsumer】,我们会配置参数【bootstrap.servers】来消费。而其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面,查看当前group的消费进度,则要依靠kafka自带的工具【kafka-consumer-offset-checker】,例如:
[root@localhost data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group --topic stable-test [2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner test-consumer-group stable-test 0 601808 601808 0 none test-consumer-group stable-test 1 602826 602828 2 none test-consumer-group stable-test 2 602136 602136 0 none
offset更新的方式,不区分是用的哪种api,大致分为两类:
自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。
文章出处:http://www.mamicode.com/info-detail-1969443.html
相关文章推荐
- kafka 消费者offset记录位置和方式
- kafka学习之监听方式实现消费者
- 关于怎么获取kafka指定位置offset消息
- spark streaming读取kafka数据,记录offset
- Kafka offset存储方式与获取消费实现
- 加米谷:Kafka OffsetMonitor:监控消费者和延迟的队列
- 【记录】python中,两种读取txt的方式;并结合jieba找出词频位置分布
- 加米谷:Kafka OffsetMonitor:监控消费者和延迟的队列
- KafkaManager对offset的两种管理方式
- 关于SpringKafka消费者的几个监听器:[一次处理单条消息和一次处理一批消息]以及[自动提交offset和手动提交offset]
- 记录一次kafka消费者在kafkaConsumer.poll(2000L)阻塞的问题
- KafkaOffsetMonitor:监控消费者和延迟的队列
- kafka-offset 存储方式
- spark streaming 实现kafka的createDirectStream方式!!不坑
- Dynamics CRM 通过Odata创建及更新记录各类型字段的赋值方式
- kafka生产者消费者示例代码
- kafka集群搭建和使用Java写kafka生产者消费者
- spark streaming程序因集群kafka版本不一致造成ZkUtils类无法更新offset解决方案
- 编程的真正奥义在于,把人类的思维、设计、语言、逻辑和精神创造以一种计算机可以识别和储存的方式记录下来。
- 生产者/消费者问题的多种Java实现方式