Apache Kafka源码剖析:第12篇 延迟操作系列1-DelayedProduce
2017-08-19 00:00
836 查看
下面开始介绍延迟操作组件,本节讲的是TimingWheel.
DelayedOperationPurgatory是一个相对独立的组件,主要功能是管理延迟操作。
底层依赖Kafka提供的时间轮实现。
有的读者会说为啥不用JDK本身的实现,这是因为Kafka这种分布式系统的请求量巨大,性能要求高,
-
在高性能的框架中,为了将定时任务的存取操作以及取消操作的时间复杂度降低为O(1)
一般会使用其他方式实现定时任务组件,例如,使用时间轮的方式。
Kafka的时间轮实现是TimeWheel, 是1个存储定时任务的环形队列。
底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。
TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.
TimingWheel提供了层级时间轮的概念
------------------------------------------------------------------
前面提到ProducerRequest,对于它来说,如果其中的acks字段设置为-1,表示这个请求到达Leader副本后,需要ISR集合中所有副本都同步(或者超时)后,才能返回响应给客户端。
ISR集合中的副本分布在不同的broker上,也就是不同的机器上,与Leader副本进行通信就涉及到网络通信,
一般情况下,我们认为网络传输时不可靠的而且比较慢!
所以通常采用异步的方式处理来避免线程长时间等待!
---
当FetchRequest发送给Leader副本后,会积累一定量的消息才返回给消费者或者Follow副本,实现批量发送!
---
我们先看看ProducerRequest的acks为-1时,服务端的处理流程:
处理函数
它会调用下面的函数
之所以会用到replicaManager是因为有多个副本,不是说我把这个消息存本地就行了,还要有其它ISR集合里的broker收到消息才可以!
先看appendRecords的实现
然后生成DelayedProduce对象!
然后判断是否需要同步等待各个ISR的响应
然后生成1个异步响应的容器
---
DelayedOperationPurgatory是一个相对独立的组件,主要功能是管理延迟操作。
底层依赖Kafka提供的时间轮实现。
有的读者会说为啥不用JDK本身的实现,这是因为Kafka这种分布式系统的请求量巨大,性能要求高,
-
在高性能的框架中,为了将定时任务的存取操作以及取消操作的时间复杂度降低为O(1)
一般会使用其他方式实现定时任务组件,例如,使用时间轮的方式。
Kafka的时间轮实现是TimeWheel, 是1个存储定时任务的环形队列。
底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。
TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask.
TimingWheel提供了层级时间轮的概念
------------------------------------------------------------------
前面提到ProducerRequest,对于它来说,如果其中的acks字段设置为-1,表示这个请求到达Leader副本后,需要ISR集合中所有副本都同步(或者超时)后,才能返回响应给客户端。
ISR集合中的副本分布在不同的broker上,也就是不同的机器上,与Leader副本进行通信就涉及到网络通信,
一般情况下,我们认为网络传输时不可靠的而且比较慢!
所以通常采用异步的方式处理来避免线程长时间等待!
---
当FetchRequest发送给Leader副本后,会积累一定量的消息才返回给消费者或者Follow副本,实现批量发送!
---
我们先看看ProducerRequest的acks为-1时,服务端的处理流程:
处理函数
它会调用下面的函数
// call the replica manager to append messages to the replicas replicaManager.appendRecords( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, isFromClient = true, entriesPerPartition = authorizedRequestInfo, responseCallback = sendResponseCallback)
之所以会用到replicaManager是因为有多个副本,不是说我把这个消息存本地就行了,还要有其它ISR集合里的broker收到消息才可以!
先看appendRecords的实现
然后生成DelayedProduce对象!
val produceStatus = localProduceResults.map { case (topicPartition, result) => topicPartition -> ProducePartitionStatus( result.info.lastOffset + 1, // required offset new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status } 生成ISR集合里的响应结果容器
然后判断是否需要同步等待各个ISR的响应
// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete // // 1. required acks = -1 // 2. there is data to append // 3. at least one partition append was successful (fewer errors than partitions) private def delayedProduceRequestRequired(requiredAcks: Short, entriesPerPartition: Map[TopicPartition, MemoryRecords], localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = { requiredAcks == -1 && entriesPerPartition.nonEmpty && localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size }
然后生成1个异步响应的容器
// create delayed produce operation val produceMetadata = ProduceMetadata(requiredAcks, produceStatus) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
---
相关文章推荐
- Apache Kafka源码剖析:第10篇 日志存储系列5-LogSegment & Log
- Apache Kafka源码剖析:第7篇 日志存储系列2-FileMessageSet
- Apache Kafka源码剖析:第9篇 日志存储系列4-OffsetIndex
- Apache Kafka源码剖析:第6篇 日志存储系列1-基本概念
- IronPython 源码剖析系列(1):IronPython 编译器
- LDD3源码分析之时间与延迟操作
- WorldWind源码剖析系列:影像图层类ImageLayer
- jQuery-1.9.1源码分析系列(八) 属性操作
- hsqldb源码分析系列5 查询引擎之查询操作
- apache kafka系列之源码分析走读-kafkaApi详解
- 剖析Elasticsearch集群系列之一:Elasticsearch的存储模型和读写操作
- WorldWind源码剖析系列:WorldWind实时确定、更新、初始化和渲染地形和纹理数据
- 源码剖析sun.misc.Unsafe && Compare And Swap(CAS)操作
- thrift源码剖析系列
- WorldWind源码剖析系列:二维点类Point2d和三维点类Point3d
- apache kafka系列之源码分析走读-server端网络架构分析
- WorldWind源码剖析系列:图层管理器按钮类LayerManagerButton和菜单条类MenuBar
- IronPython 源码剖析系列(2):IronPython 引擎的运作流程
- memcached源码剖析系列之内存存储机制(三)
- WorldWind源码剖析系列:四叉树瓦片集合类QuadTileSet