您的位置:首页 > 编程语言

Kafka代码走读-LogManager

2016-07-21 21:18 651 查看
https://github.com/haogrgr/haogrgr-test/blob/master/logs/kafka_source.txt

源码阅读(0.8.2.2):

(一)概览

1.调用kafka.Kafka中的main方法启动

2.通过启动参数获取配置文件的路径

3.通过System.getProperty(log4j.configuration)来获取日志配置

4.加载配置文件, 校验配置

5.根据配置启动指标导出任务
KafkaMetricsReporter.startReporters(serverConfig.props)
根据配置kafka.metrics.reporters, kafka.metrics.polling.interval.secs, 来初始化指标报告类,
内部是使用的com.yammer.metrics来做指标收集(最新的版本改名为了io.dropwizard.metrics),
Kafka提供一个内部实现kafka.metrics.KafkaCSVMetricsReporter, 内部又实用CsvReporter(com.yammer.metrics提供), 将指标信息写出到指定目录下的csv文件, 顺便注册个MBean.

6.调用内部的启动类
val kafkaServerStartable = new KafkaServerStartable(serverConfig)
kafkaServerStartable.startup

7.注册shutdown hook, 等待shutdown信号(阻塞), 调用shutdown方法关闭.

(二)KafkaServerStartable.startup启动逻辑

1.设置broker状态为Starting, 初始话shutdown信号(CountDownLatch)和状态(isShuttingDown)

2.kafkaScheduler.startup()
启动kafka调度任务线程池, 方法内部为初始化ScheduledThreadPoolExecutor, 其实就是ScheduledThreadPoolExecutor的一个包装, 注意这里的线程是使用daemon类型

3.zkClient = initZk()初始化zkClient链接
这里可以设置 `zookeeper.connect=localhost:2181/kafka` 这样的url, kafka会将kafka相关的zk路径建立在/kafka下, 方便一个zk注册多个kafka集群.

4.初始化LogManager, logManager = KafkaServer.createLogManager(zkClient, brokerState){
首先, 根据配置文件中的配置创建LogConfig对象,
然后, 从zk上面获取所有topic的配置, 合并配置(zk上的配置覆盖文件中的配置)
AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
从/brokers/topics下获取所有的topic, 然后循环/config/topics/xxx获取xxx的配置(json中config属性)
根据配置创建CleanerConfig对象

然后创建LogManager实例, new LogManager(){
private val logs = new Pool[TopicAndPartition, Log]()
初始化Log池Pool[并发map的包装]

createAndValidateLogDirs(logDirs)
创建日志目录

private val dirLocks = lockLogDirs(logDirs)
初始化文件锁FileLock,内部JDK的文件锁

初始化OffsetCheckpoint对象(saves out a map of topic/partition=>offsets to a file)
recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
OffsetCheckpoint初始化会删除对应的.tmp文件, 并创建对应的文件(如果不存在).
OffsetCheckpoint文件格式为:
第一行: 文件格式版本号, 目前是只有一个版本(0).
第二行: 文件包含的记录数.
其他行: 具体的快照信息, 格式为, `topic partition offset`, 如 haogrgr 0 100.
OffsetCheckpoint写是先写.tmp文件, 然后再rename操作, 最后刷盘(writer.flush();fileOutputStream.getFD().sync()).

loadLogs(){
具体见下面的2.4.1.加载日志 loadLogs().
}
}
}

4.1.加载日志 loadLogs() {
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
首先, 为每个日志dir创建一个线程池来异步执行初始化任务

遍历全部的dir列表{
首先, 为每个日志dir, 关联一个线程池(线程数num.recovery.threads.per.data.dir), 用来初始化Log实例, 方法执行完毕即关闭

然后, 通过日志目录下的CleanShutdownFile文件来判断是否为正常关闭, 正常关闭的时候(LogManager.shutdown方法里面), 会创建该文件, 表示正常关闭,
非正常关闭, 将状态设置为RecoveringFromUncleanShutdown
(大概看了下, 后续的Log.loadSegments会检查CleanShutdownFile, 然后初始化完成后进行Log.recoverLog操作, 细节TODO)

val recoveryPoints = this.recoveryPointCheckpoints(dir).read
recoveryPoints是一个map[topic_partition, offset], kafka在正常关闭, 或定时任务, 或者清理日志的时候(细节TODO), 会将当前每个分区的最新的offset写到快照文件中,
这里读取文件, 获取每个分区的快照信息(offset), recoveryPoint在Log对象中, 保存的是已经flush的最大的offset值, 在log.flush中, 刷盘后会更新该值, 即小于等于recoveryPoint的消息都是落盘了的.
主要作用是: 减少恢复时日志的扫描数量; 通过(logEndOffset - recoveryPoint)可以得到未刷盘消息数, 做刷盘控制;

对与日志dir下的每个目录(topic-partition目录)创建初始化Log对象的任务 Utils.runnable {
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
首先通过目录名解析出来topic和partition

然后, 获取topic配置类(根据前面2,4中zk上的配置和默认配置合并), 同时获取 recoveryPoint值

val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) {
创建Log对象实例

private val segments = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
Log对象属性, 用来存放segment对象, LogSegment表示分区下的日志文件及其对应的索引文件.

loadSegments() {
初始化分区下的所有LogSegment对象

首先创建日志目录(如果不存在)

然后遍历日志目录下所有文件 {
删除所有以[.deleted]结尾的文件(log和index), 什么时候会产生该后缀的文件?
a)根据配置, kafka会删除一些旧日志(LogSegment)(retentionMs, retentionSize), 定时任务LogManager.cleanupLogs;
b)日志恢复操作, Log.recoverLog, 当非正常关闭kafka时, 会恢复日志, 一旦发现不正常的日志, 这个offset(含)之后的字节和LogSegment都会被删除;
c)主从同步时, 当从落后太多(从的最大offset小于主的最小offset(可能日志会被清理了)), 则从会logManager.truncateFullyAndStartAt, 来删除老的日志, 从新的offset开始;
d)主从同步时, 由于分区Leader的变化, 之前和旧Leader同步的数据可能不是最新的, 需要删除highWatermark(offset)(TODO)之后的数据, 防止不一致, ReplicaManager.makeFollowers;
删除步骤为:
a)先讲要删除的LogSegment从log.segments中移除;
b)再重命名日志和索引文件名后缀为.deleted;
c)最后提交异步任务, 任务中再删除日志和对应的索引文件.

删除所有以[.cleaned]结尾的文件(log和index), 什么时候会产生该后缀的文件?
a)Cleaner.clean中, 会将多个Segment清理成一个Segment, 然后交换到Log.segments中(清理:同key的消息, 去最后的value), 交换过程中, 先是将多个Segment中的日志(合并同key消息)写入到.cleaned文件中, 写完后, 重命名为.swap文件, 然后删除老Segment文件, 最后去掉.swap后缀;
可以看到, 交换步骤为分为三步, 第一步先写.cleaned文件, 保证文件全部清理完后再操作, 然后重命名为.swap文件, 这时可以删除老的文件了, 删除操作参考上面的.deleted文件操作, 最后重命名, 去掉.swap后缀, 中间任何一步异常, 都不会破坏文件完整性.
一个疑惑: 当重命名为.swap成功, 但是马上carsh了, 导致老的log没有移除, 那么下次启动时, 老的日志依然存在, 如何处理(猜测: 因为是clean, 所以只会clean达到调节的log, 下次启动会继续clean操作, 待验证TODO)

处理.swap文件, 如上面说的, 当swap操作进行到一半而挂掉了, 就可能会有.swap文件, 这里需要完成swap操作, 重命名去掉.swap后缀, 删除索引, 后续会判断相关的log文件是否有对应的index文件, 没有会重建索引文件.
}

然后再次遍历日志目录下所有文件{
首先, 删除没有对应log文件的index文件.

然后, 如果为log文件, 则创建LogSegment对象, 如果没有对应的index文件, 则重建LogSegment.recover, 然后将segment放入到log的segments中去, key为文件名(startOffset).

重建索引文件 LogSegment.recover {
遍历log文件, 每隔指定间隔字节数, 就在索引文件中添加一条索引, 最后设置log和index文件大小为有效的字节数
}

则创建LogSegment对象 segment = new LogSegment {
创建FileMessageSet对象{
这里调用的是def this(file: File)这个构造方法, 内部会调用FileMessageSet(file, new RandomAccessFile(file, "rw").getChannel(), 0, Int.MaxValue, false)
这里通过RandomAccessFile来获取到对应的FileChannel, 提供类似于切片的功能, 通过维护start, end, isSlice来实现, 提供iterator方式来遍历整个日志文件.
消息添加是通过ByteBufferMessageSet.writeTo来从buffer写到文件channel的.
这个类主要提供Log文件的读写等操作
}

创建OffsetIndex对象{
创建startOffset.index文件
创建对应的RandomAccessFile实例:val raf = new RandomAccessFile(file, "rw")
如果老的index文件存在, 即file.createNewFile返回true, 则设置文件长度为小于maxIndexSize(默认1m, 最小为8b), 如果不为8的倍数, 则取最近的8的倍数 :raf.setLength(roundToExactMultiple(maxIndexSize, 8))
然后通过raf.getChannel.map来内存映射文件, 获取MappedByteBuffer
最后, 设置buffer的position指针, 如果新文件, 就是0, 老文件, 则是, 文件大小, 然后关闭流

这个类主要的功能就是维护索引, 先是mmap索引文件, 而索引文件中内容是已8个字节为一个entry, 其中前4个字节为相对offset(原始offset-baseOffset), 后4个字节为日志文件偏移,
查找时采用二分查找, 因为offset在索引文件中是有序的, 同时因为是mmap, 所以查找效率高, 主要用于日志读取时使用(LogSegment.translateOffset)

这里并不是每个消息offset都索引, 而是间隔一定大小索引一次(indexIntervalBytes), 所以查找到文件位置后, 还需要再去log中去查找到精确的位置, 具体的判断是在LogSegment中实现的.
}

LogSegment是log和index的包装, 提供一个统一的api来统一的操作index和log, 屏蔽log和index细节.
包含了append, read, flush, delete等方法.
}

}

好了, 经过前面两次的遍历, 已经创建好了LogSegment并都放到Log.segments中去了

如果目录是空的, 就创建一个startOffset=0的LogSegment, 加入到Log.segments中去.

如果目录不是空的, 就进行Log.recoverLog操作{
首先, 如果是正常关闭的(hasCleanShutdownFile), 则没啥好恢复的, 设置recoveryPoint为下一个offset, 结束方法.

非正常结束, 需要恢复recoveryPoint(前面2.4.1有讲)之后的LogSegment对应的日志, 通过Log.segments的方法, 获取大于recoveryPoint的记录,
遍历需要恢复操作的LogSegment列表, 对每个LogSegment, 遍历日志文件, 重建索引, 遍历的时候校验消息(computeChecksum等), 一旦某条消息出问题了, 这条消息和它后面的数据都会被删除.
同时, 改LogSegment之后的LogSegment也会被删除.

// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
最后设置当前活动的LogSegment(startOffset最大的segment), 的index文件为config.maxIndexSize, 因为上一步会吧index文件设置为真实大小, 而当前LogSegment还会有add操作, 会导致index写失败.
kafka.log.OffsetIndex.append中会校验index是否满了(require(!isFull)).
}

最后, 一个简单的校验, 校验index文件大小是不是8的倍数.

}loadSegments()结束

}new Log()结束

创建完Log实例后, 加入到LogManager.logs中(key:TopicPartition, value:Log实例), 如果存在TopicPartition对应两个Log实例, 报错

}Utils.runnable结束

最后, 提交上面的任务(Utils.runnable)到线程池中并行执行, 并收集结果.

}遍历全部的dir列表, 结束

对每个log dir, 获取上面的任务的执行结果, 无异常, 则删除目录下面的cleanShutdownFile文件.

最后结束线程池.

}loadLogs 关闭

5.日志管理器启动 logManager.startup(){

}

问题记录:

1.调试过程中, 碰到了个问题, 启动的时候, 报了NPE(kafka.log.OffsetIndex.forceUnmap), 调试发现, 是因为方法内部调用了sun.nio.ch.DirectBuffer.cleaner().clean(), 而cleaner()方法可能会返回Null, 导致空异常.
调试DirectBuffer, 他的cleaner是在构造方法的时候初始化的, 当OffsetIndex.mmap属性初始化的时候, 会将index文件映射为MappedByteBuffer, 通过sun.nio.ch.FileChannelImpl.map方法, 而当文件大小为0的时候, 并不会创建cleaner实例, 所以导致DirectBuffer.cleaner().clean()出现NPE异常, 但是为什么index文件会是空的, 明明已经写入消息了, TODO.
补充一点, sun.misc.Cleaner实现PhantomReference接口, 用来在引用的对象被回收的时, 则就会把对象放到PhantomReference队列中, 应用可以通过队列获取到Reference对象, 以便做些回收的工作, 看Cleaner代码时, 发现, 并没有使用PhantomReference队列, 然后查看到java.lang.ref.Reference对象中对Cleaner会优化处理, 当发现为Cleaner类型时, 直接调用Cleaner.clean方法, 其他类型则enqueue.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka 源码 LogManager