一次KAFKA消费者异常引起的思考
问题描述:
线上出现一台服务器特别慢,于是关闭了服务器上的kafka broker. 关闭后发现一些kafka consumer无法正常消费数据了, 日志错误:
o.a.kakfa.clients.consumer.internals.AbstractCordinator Marking the coordinator (39.0.2.100) as dead.
原因:
经过一番排查,发现consumer group信息:
(kafka.coordinator.GroupMetadataMessageFormatter类型):
groupId::[groupId,Some(consumer),groupState,Map(memberId -> [memberId,clientId,clientHost,sessionTimeoutMs], ...->[]...)],
存到了KAFKA内部topic:
__consumer_offsets里, , 它的key是 groupId.
同时发现broker 参数 offsets.topic.replication.factor 错误地被设置为1. 这个参数表示TOPIC:
__Consumer_offsets的副本数. 这样一旦某个broker被关闭, 如果被关闭的Broker 是
__Consumer_offsets的某些partition的Leader. 则导致某些consumer group 不可用. 如果一旦broker已经启动, 需要手工通过命令行来扩展副本数.
reassignment.json: {"version":1, "partitions": [{"topic": "xxx", "partition": 0, "replicas": {brokerId1, brokerId2}}] } kafka-reassign-partitions --zookeeper localhost:2818 --reassignment-json-file reassignment.json --execute
客户端寻找Consumer Coordinator的过程:
客户端 org.apache.kafka.clients.consumer.internals.AbstractCoordinator
如果Coordinator 未知 (AbstractCoordinator.coordinatorUnknown()), 发起请求 lookupCoordinator,向负载最低的节点发送FindCoordinatorRequest
服务端 KafkaApis.handleFindCoordinatorRequest 接收请求:
首先调用 GroupMetaManager.partitionFor(consumerGroupId) consunerGroupId 的 hashCode 对
__consumer_offsets总的分片数取模获取partition id 再从
__consumer_offset这个Topic 中找到partition对应的 Partition Metadata, 并且获取对应的Partition leader 返回给客户端
引伸思考
KAFKA 的failover机制究竟是怎么样的?假使
__consumer_offset设置了正确的副本数,重选举的过程是怎样的. 如果broker宕机后导致某些副本不可用, 副本会自动迁移到其他节点吗?带着这些问题稍微阅读了一下KAFKA的相关代码:
当一个Broker 被关掉时, 会有两步操作:
KafkaController.onBrokerFailure ->KafkaController.onReplicasBecomeOffline
主要是通过 PartitionStateMachine.handleStateChanges 方法通知Partition状态机将状态置为offline. ReplicaStateMachine.handleStateChanges方法会将Replica 状态修改为OfflineReplica, 同时修改partition ISR. 如果被关闭broker 是partition leader 那么需要重新触发partition leader 选举,最后发送LeaderAndIsrRequest获取最新的Leader ISR 信息.
KafkaController.unregisterBrokerModificationsHandler 取消注册的BrokerModificationsHandler 并取消zookeeper 中broker 事件的监听.
当ISR请求被发出,KafkaApis.handleLeaderAndIsrRequest() 会被调用. 这里如果需要变更leader的partition是属于
__consumer_offset这个特殊的topic,取决于当前的broker节点是不是partition leader. 会分别调用GroupCoordinator.handleGroupImmigration 和 GroupCoordinator.handleGroupEmmigration. 如果是partition leader, GroupCoordinator.handleGroupImmigration -> GroupMetadataManager.loadGroupsForPartition 会重新从
__consumer_offset读取group数据到本地metadata cache, 如果是partition follower, GroupCoordniator.handleGroupImigration -> GroupMetadataManager.removeGroupsForPartition 会从metadata cache中移除group信息. 并在onGroupUnloaded回调函数中将group的状态变更为dead. 同时通知所有等待join或者sync的组成员.
KAFKA在Broker关闭时不会自动做partition 副本的迁移. 这时被关闭的Broker上的副本变为under replicated 状态. 这种状态将持续直到Broker被重新拉起并且追上新的数据, 或者用户通过命令行 手动复制副本到其他节点.
官方建议设置两个参数来保证graceful shutdown. controlled.shutdown.enable=true auto.leader.rebalance.enable=true前者保证关机之前将日志数据同步到磁盘,并进行重选举. 后者保证在broker重新恢复后再次获得宕机前leader状态. 避免leader分配不均匀导致读写热点.
Reference
https://www.geek-share.com/detail/2707308259.html
https://blog.csdn.net/huochen1994/article/details/80511038
https://www.jianshu.com/p/1aba6e226763
- 最近一次需求开发过程中引起我对规范的思考
- Kafka异常处理(消费者不消费数据)
- 记一次混淆后引起的异常,被覆盖的方法未抛出Exception
- SDI应用程序调用DestroyWindow()退出程序出现异常引起的一些思考
- 记一次Kafka消费者拉取数据不均匀问题
- ZooKeeper配置引起Hbase和Kafka启动异常
- C语言版kafka消费者代码运行时异常kafka receive failed disconnected
- 一次vector析构异常的思考
- 记一次处理IE引起的上网异常处理
- 记录一次关于Activity与Fragment生命周期引起的异常
- 记录一次kafka消费者在kafkaConsumer.poll(2000L)阻塞的问题
- 一次按操作规程操作失败引起的思考
- 一次按操作规程操作失败引起的思考
- 一次由于for update锁表,引起应用异常的经历
- 由一次java作业 引起的思考
- 一次老板发话引起的思考
- Kafka异常处理(消费者不消费数据)
- 关于SpringKafka消费者的几个监听器:[一次处理单条消息和一次处理一批消息]以及[自动提交offset和手动提交offset]
- 一次用户进程异常引起的kernel panic事后总结——高通平台,基于内存dump信息
- kafka消费者脚本无法启动问题