使用命令读取kafka的内部topic:__consumer_offsets
2016-11-21 21:57
381 查看
众所周知,由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。
不过依然有很多用户希望了解__consumer_offsets topic内部到底保存了什么信息,特别是想查询某些consumer group的位移是如何在该topic中保存的。针对这些问题,本文将结合一个实例探讨如何使用kafka-simple-consumer-shell脚本来查询该内部topic。
参数解释:--time -1 表示从最新的时间的offset中得到数据条数
输出结果每个字段分别表示topic、partition、untilOffset
上面的输出结果表明总共生产了7条消息
参数解释:
--from-beginning 如果consumer之前没有建立offset,则从producer最开始的数据读取。
其实在第4步中在配置文件consumer.properties中配置了group.id为wv,所以这儿得到的是wv,如果没有配置,会随机产生一个console-consumer-***的group.ig
参数解释:
--delete-consumer-offsets由于之前wv这个group.id
注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
在本例中,对应的分区=Math.abs("wv".hashCode()) % 50 = 7,即__consumer_offsets的分区7保存了这个consumer group的位移信息,下面让我们验证一下。
输出结果如下:
对于spark streaming来说,其中DStream中,DStream的本质是RDD序列,读取kafka时,也就是KafkaRDD,通过读取KafkaRDD的getPartitions方法,可以发现,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的。
参考:http://www.cnblogs.com/huxi2b/p/6061110.html
不过依然有很多用户希望了解__consumer_offsets topic内部到底保存了什么信息,特别是想查询某些consumer group的位移是如何在该topic中保存的。针对这些问题,本文将结合一个实例探讨如何使用kafka-simple-consumer-shell脚本来查询该内部topic。
1. 创建 topic “test”
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 1 --partitions 3 #创建一个有3个partition、1个副本的 test topic
2. 使用kafka-console-producer.sh脚本生产消息
bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test
3. 验证生产消息成功
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1 test:2:2 test:1:2 test:0:3
参数解释:--time -1 表示从最新的时间的offset中得到数据条数
输出结果每个字段分别表示topic、partition、untilOffset
上面的输出结果表明总共生产了7条消息
4. 创建一个console consumer group
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --consumer.config config/consumer.properties --topic test --from-beginning --new-consumer
参数解释:
--from-beginning 如果consumer之前没有建立offset,则从producer最开始的数据读取。
5. 获取该consumer group的group id(后面需要根据该id查询它的位移信息)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer wv
其实在第4步中在配置文件consumer.properties中配置了group.id为wv,所以这儿得到的是wv,如果没有配置,会随机产生一个console-consumer-***的group.ig
6. 查询__consumer_offsets topic所有内容
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning --delete-consumer-offsets
参数解释:
--delete-consumer-offsets由于之前wv这个group.id
注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false
7. 计算指定consumer group在__consumer_offsets topic中分区信息
这时候就用到了第5步获取的group.id(本例中是console-consumer-46965)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:Math.abs(groupID.hashCode()) % numPartitions
在本例中,对应的分区=Math.abs("wv".hashCode()) % 50 = 7,即__consumer_offsets的分区7保存了这个consumer group的位移信息,下面让我们验证一下。
8. 获取指定consumer group的位移信息
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 7 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
输出结果如下:
[wv,test,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1479735556398,ExpirationTime 1479821956398] [wv,test,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1479735556398,ExpirationTime 1479821956398] [wv,test,2]::[OffsetMetadata[2,NO_METADATA],CommitTime 1479735556398,ExpirationTime 1479821956398]
上图可见,该consumer group果然保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
对于spark streaming来说,其中DStream中,DStream的本质是RDD序列,读取kafka时,也就是KafkaRDD,通过读取KafkaRDD的getPartitions方法,可以发现,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的。
参考:http://www.cnblogs.com/huxi2b/p/6061110.html
相关文章推荐
- Kafka 如何读取指定topic中的offset -------------用来验证分区是不是均衡!!!(__consumer_offsets)(已验证!)
- Kafka 如何读取offset topic内容 (__consumer_offsets)
- Kafka 如何读取offset topic内容 (__consumer_offsets)
- Kafka 如何读取offset topic内容 (__consumer_offsets)
- Kafka读取__consumer_offsets和Kafka 0.11客户端管理工具AdminClient
- 解决kafka集群由于默认的__consumer_offsets这个topic的默认的副本数为1而存在的单点故障问题
- Kafka 如何读取offset topic内容 (__consumer_offsets)
- 读取用户输入的内容(read命令的使用)
- 'ping' 不是内部或外部命令,也不是可运行的程序[ping命令无法使用]
- 使用while和read命令读取文件内容
- win7里边使用telnet提示'telnet' 不是内部或外部命令,也不是可运行的程序或批处理文件。
- linux使用popen读取shell命令执行结果
- linux gdb 没有符号表被读取。请使用 "file" 命令。
- 如何使用read命令读取文件的每一行
- linux ubuntu系统下,adb不是内部命令 (如何才能让adb命令可以使用)
- shell内部命令使用详解
- 使用硬盘ATA命令读取磁盘
- FTP使用的内部命令
- win7里边使用telnet命令为什么提示telnet不是内部或外部命令,也不是可运行的程序或批处理文件?解决
- ftp使用的内部命令