您的位置:首页 > 运维架构 > Apache

Apache Kafka源码剖析:第12篇 延迟操作系列1-DelayedProduce

2017-08-19 00:00 836 查看
下面开始介绍延迟操作组件,本节讲的是TimingWheel.

DelayedOperationPurgatory是一个相对独立的组件,主要功能是管理延迟操作。

底层依赖Kafka提供的时间轮实现。

有的读者会说为啥不用JDK本身的实现,这是因为Kafka这种分布式系统的请求量巨大,性能要求高,

-

在高性能的框架中,为了将定时任务的存取操作以及取消操作的时间复杂度降低为O(1)

一般会使用其他方式实现定时任务组件,例如,使用时间轮的方式。

Kafka的时间轮实现是TimeWheel, 是1个存储定时任务的环形队列。

底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。

TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.

TimingWheel提供了层级时间轮的概念

------------------------------------------------------------------

前面提到ProducerRequest,对于它来说,如果其中的acks字段设置为-1,表示这个请求到达Leader副本后,需要ISR集合中所有副本都同步(或者超时)后,才能返回响应给客户端。

ISR集合中的副本分布在不同的broker上,也就是不同的机器上,与Leader副本进行通信就涉及到网络通信,

一般情况下,我们认为网络传输时不可靠的而且比较慢!

所以通常采用异步的方式处理来避免线程长时间等待!

---

当FetchRequest发送给Leader副本后,会积累一定量的消息才返回给消费者或者Follow副本,实现批量发送!

---

我们先看看ProducerRequest的acks为-1时,服务端的处理流程:

处理函数



它会调用下面的函数

// call the replica manager to append messages to the replicas
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
isFromClient = true,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback)


之所以会用到replicaManager是因为有多个副本,不是说我把这个消息存本地就行了,还要有其它ISR集合里的broker收到消息才可以!

先看appendRecords的实现



然后生成DelayedProduce对象!



val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
}

生成ISR集合里的响应结果容器

然后判断是否需要同步等待各个ISR的响应

// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
private def delayedProduceRequestRequired(requiredAcks: Short,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}

然后生成1个异步响应的容器

// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

---

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Kafka