Kafka-Consumer 源码解析 -- 数据拉取和offset提交
2020-07-16 06:03
197 查看
Kafka-Consumer 源码解析 -- 数据拉取和offset提交
前言
上文 Kafka-Consumer 源码解析 – rebalance过程和partition的确认中已经介绍了各个consumer的partition的确认,在确认partition之后,数据拉取开始正常执行。
1、数据拉取
数据拉取步骤:
- 非自动提交和record模式下,提交对于offset
- 获取最新的partition offset,确认数据拉取位置
- 执行 KafkaConsumer poll 拉取数据
- poll 中调用 pollForFetches 执行数据拉取
- pollForFetches 首先判断是否存在已经拉取的数据,这里是因为在上一次的数据拉取过程中进行了提前的拉取工作,有可能已经存在了消息等待处理。如果没有已经拉取的数据,执行新的拉取请求。
- 拉取到数据之后,并不会将数据立即返回,而是再次执行数据拉取的操作,这里使用非阻塞网络io,消费数据和下一次的数据拉取可以同步进行,提高了数据处理能力。
在
ListenerConsumer类的
pollAndInvoke方法执行数据拉取、offset提交和数据消费的过程。
pollAndInvoke实现:
protected void pollAndInvoke() { // 如果当前的提交模式不是 自动提交 并且也不是 逐条 提交,执行 commit 操作 if (!this.autoCommit && !this.isRecordAck) { processCommits(); } // 如果当前 seeks 中存储有 TopicPartitionOffset 的实例,则需要获取偏移量信息 if (this.seeks.size() > 0) { processSeeks(); } // 检查当前消费是否暂停 checkPaused(); this.lastPoll = System.currentTimeMillis(); // 设置数据拉取状态 this.polling.set(true); // 拉取数据 ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout); // 对 polling 拉取状态执行 CAS 过程 if (!this.polling.compareAndSet(true, false)) { /* * There is a small race condition where wakeIfNecessary was called between * exiting the poll and before we reset the boolean. */ if (records.count() > 0) { this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count()); } return; } checkResumed(); debugRecords(records); // 如果拉取到的数据不为 empty ,执行数据消费的过程 if (records != null && records.count() > 0) { if (this.containerProperties.getIdleEventInterval() != null) { this.lastReceive = System.currentTimeMillis(); } invokeListener(records); } else { checkIdle(); } }
查看
KafkaConsumer的
poll方法,实际拉取数据的过程为
pollForFetches:
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // if data is available already, return it immediately // 这里检查是否数据已经准备完成 // consumer 的数据 poll 是在拉取完成之后,并不会立即返回数据,而是执行一次非阻塞的数据拉取过程 // 这样的处理可以大大提高线程的并行处理能力 final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } // send any new fetches (won't resend pending fetches) // 发送数据拉取的请求 fetcher.sendFetches(); // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { pollTimeout = retryBackoffMs; } Timer pollTimer = time.timer(pollTimeout); // 使用异步io执行下一次的数据拉取 client.poll(pollTimer, () -> { // since a fetch might be completed by the background thread, we need this poll condition // to ensure that we do not block unnecessarily in poll() return !fetcher.hasCompletedFetches(); }); timer.update(pollTimer.currentTimeMs()); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster if (coordinator != null && coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); } return fetcher.fetchedRecords(); }
2、offset提交
offset的手动提交方式:
- RECORD
每处理一条commit一次 - BATCH(默认)
每次poll的时候批量提交一次,频率取决于每次poll的调用频率 - TIME
每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?) - COUNT
累积达到ackCount次的ack去commit - COUNT_TIME
ackTime或ackCount哪个条件先满足,就commit - MANUAL
listener负责ack,但是背后也是批量上去 - MANUAL_IMMEDIATE
listner负责ack,每调用一次,就立即commit
手动提交的实现在
ListenerConsumer类
invokeOnMessage方法中调用
ackCurrent执行。
自动提交可通过配置项auto.commit.interval.ms来设置提交操作的时间间隔,自动提交并非通过定时任务周期性地提交,而是在一些特定事件发生时才检测与上一次提交的时间间隔是否超过了${auto.commit.interval.ms}计算出的下一次提交的截止时间nextAutoCommitDeadline,若时间间隔超过了nextAutoCommitDeadline 则请求提交偏移量,同时更新下一次提交消费偏移量的nextAutoCommitDeadline。
自动提交的触发事件:
- 通过 KafkaConsumer.assign()订阅分区
- ConsumerCoordinator.poll()方法处理时(maybeAutoCommitOffsetsAsync方法)
- .在消费者进行平衡操作前
- ConsumerCoordinator 关闭操作
ConsumerCoordinator.poll()方法:
@Override public ConsumerRecords<K, V> poll(final Duration timeout) { return poll(time.timer(timeout), true); } private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires do { client.maybeTriggerWakeup(); if (includeMetadataInTimeout) { if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }
其中
updateAssignmentMetadataIfNeeded方法:
boolean updateAssignmentMetadataIfNeeded(final Timer timer) { if (coordinator != null && !coordinator.poll(timer)) { return false; } return updateFetchPositions(timer); } // ConsumerCoordinator 类中 public boolean poll(Timer timer) { maybeUpdateSubscriptionMetadata(); invokeCompletedOffsetCommitCallbacks(); if (subscriptions.partitionsAutoAssigned()) { // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. pollHeartbeat(timer.currentTimeMs()); if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { return false; } if (rejoinNeededOrPending()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) { if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) { this.metadata.requestUpdate(); } if (!client.ensureFreshMetadata(timer)) { return false; } maybeUpdateSubscriptionMetadata(); } if (!ensureActiveGroup(timer)) { return false; } } } else { if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); } } maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); return true; }
maybeAutoCommitOffsetsAsync为异步提交位移的实现。
相关文章推荐
- Kafka-Consumer 源码解析 -- 数据消费
- Kafka源码深度解析-序列8 -Consumer -Fetcher实现原理与offset确认机制
- kafka源码解析之十六生产者流程(客户端如何向topic发送数据)
- Kafka源码深度解析-序列6 -Consumer -消费策略分析
- kafka源码解析之十七消费者流程(客户端如何获取topic的数据)
- Spark-streaming-2.0-Kafka数据接收并行度源码解析
- KafkaConsumer源码解析
- Kafka异步发送API,main线程发送,回调ack信息,sender线程发送;同步发送API,Consumer API,手动/异步提交、自定义存储offset,Interceptor、flume+
- Kafka源码深度解析-序列9 -Consumer -SubscriptionState内部结构分析
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- Kafka消费者源码解析之一KafkaConsumer
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- kafka重置consumer的offset 数据重复消费
- Kafka源码深度解析-序列12 -Server核心组件之2-ReplicaManager核心数据结构与Replica同步原理
- java 解析json数据(内涵源码)
- 16.Spark Streaming源码解读之数据清理机制解析
- Kafka源码解析(一)---LogSegment以及Log初始化
- 数据同步之header同步源码解析(一)
- php获取通过http协议post提交过来xml数据及解析xml
- Kafka生产者源码解析之二RecordAccumulator