Apache Kafka源码剖析:第10篇 日志存储系列5-LogSegment & Log
2017-08-18 00:00
525 查看
为了防止一个文件太大,Kafka将Log分成了若干段。每个日志文件和索引文件组合对应了1个LogSegment.
---
在LogSegment中封装了1个FileMessageSet和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其它辅助功能!
---
===
在读取日志文件之前,需要将offset转换为实际的文件物理地址才可以,通过之前的知识点,应该怎么做?
1)比如1017的offset,文件名是1000,所以相对offset就是1017-1000=17
2)将17去稀疏索引文件中查找,可以找到1个稀疏索引项.
3)根据这个索引项,从文件的绝对物理位置开始查找绝对offset为1017的消息。
当然有很多细节,比如说压缩消息的存在。导致查询有一些变化的细节,但是总体还是很简单!
---聊完了LogSegment ,我们来聊Log
Log是对多个LogSegment对象的顺序组合,形成1个逻辑的日志。
为了实现快速定位LogSegment,Log使用SkipList对LogSegment进行管理!
跳表是一种比较随机化的数据结构,查找效率和红黑树差不多,但是插入和删除操作比红黑树简单很多。
在 Log中,将每个LogSegment的baseOffset作为key,LogSegment对象作为value,
放入到segments这个跳表中管理。
向Log中追加消息是顺序写入的,那么只有最后1个LogSegment可以写入,之前的只能读。
我们把最后1个segment称之为activeSegment.
随着数据的不断写入,当activeSegment的日志文件大小到了一定的阈值后,就要切换新的segment文件。
写数据的时候,可能需要重新开一个segment
是否切换有几个条件
1)日志大小
2)当前 activeSegment的寿命超过了配置的LogSegment最长存活时间。
3)索引文件满了。
第1个很好理解,就是文件保证不要太大
第2个怎么理解,想象一下,client写了1条消息,然后不写了,这个文件如果一直不切换的话,就无法被读到了。
可见,确实是选择一批segment来持久化,这样就把持久化的任务和写线程隔离开来,尽量不占用写的主线程的任务!
===
Log.append()方法通过加锁进行同步控制,因为涉及到多线程操作,多个线程写。
但是在read()方法中并没有加锁操作,在开始查询消息之前会将nextOffsetMetaData字段保存为方法的局部变量,来避免线程安全问题。
---
在LogSegment中封装了1个FileMessageSet和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其它辅助功能!
/tmp/kafka-logs/broker0/my-replicated-topic-0# ls -al total 24 drwxr-xr-x 2 root root 4096 Aug 13 03:33 . drwxr-xr-x 55 root root 4096 Aug 13 04:55 .. -rw-r--r-- 1 root root 0 Aug 13 04:55 00000000000000000000.index -rw-r--r-- 1 root root 577 Aug 10 17:11 00000000000000000000.log -rw-r--r-- 1 root root 12 Aug 13 04:55 00000000000000000000.timeindex -rw-r--r-- 1 root root 10 Aug 13 03:33 00000000000000000010.snapshot -rw-r--r-- 1 root root 8 Aug 10 17:04 leader-epoch-checkpoint
---
/** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in * any previous segment. * * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. * * @param log The message set containing log entries * @param index The offset index * @param timeIndex The timestamp index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param time The time instance */ @nonthreadsafe class LogSegment(val log: FileRecords,//用于操作对应日志文件的FileMessageSet对象 val index: OffsetIndex,//用于操作索引文件 val timeIndex: TimeIndex, val txnIndex: TransactionIndex, val baseOffset: Long,//第一条消息的offset值 val indexIntervalBytes: Int,//索引项之间间隔的最小字节数 val rollJitterMs: Long, time: Time) extends Logging { private var created = time.milliseconds//标志LogSegment对象的创建时间 /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0//自动上次添加索引项后,日志文件中累计加入的 Message字节数 /* The timestamp we used for time based log rolling */ private var rollingBasedTimestamp: Option[Long] = None /* The maximum timestamp we see so far */ @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp @volatile private var offsetOfMaxTimestamp: Long = timeIndex.lastEntry.offset
===
在读取日志文件之前,需要将offset转换为实际的文件物理地址才可以,通过之前的知识点,应该怎么做?
1)比如1017的offset,文件名是1000,所以相对offset就是1017-1000=17
2)将17去稀疏索引文件中查找,可以找到1个稀疏索引项.
3)根据这个索引项,从文件的绝对物理位置开始查找绝对offset为1017的消息。
当然有很多细节,比如说压缩消息的存在。导致查询有一些变化的细节,但是总体还是很简单!
通过上面的分析,主要是让大家对一些概念和机制,有个了解。 虽然可能达不到源码100%的掌握,但是对于理解Kafka的实现机制 和以后定位问题,可以起到帮助作用 更重要的是,通过这些分析,以后碰到生产上的问题,心里不慌,有底气迎战!
---聊完了LogSegment ,我们来聊Log
Log是对多个LogSegment对象的顺序组合,形成1个逻辑的日志。
为了实现快速定位LogSegment,Log使用SkipList对LogSegment进行管理!
跳表很常见,在redis和leveldb中都有使用! JDK中也有!
跳表是一种比较随机化的数据结构,查找效率和红黑树差不多,但是插入和删除操作比红黑树简单很多。
在 Log中,将每个LogSegment的baseOffset作为key,LogSegment对象作为value,
放入到segments这个跳表中管理。
向Log中追加消息是顺序写入的,那么只有最后1个LogSegment可以写入,之前的只能读。
我们把最后1个segment称之为activeSegment.
随着数据的不断写入,当activeSegment的日志文件大小到了一定的阈值后,就要切换新的segment文件。
写数据的时候,可能需要重新开一个segment
// maybe roll the log if this segment is full val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset)
是否切换有几个条件
1)日志大小
2)当前 activeSegment的寿命超过了配置的LogSegment最长存活时间。
3)索引文件满了。
第1个很好理解,就是文件保证不要太大
第2个怎么理解,想象一下,client写了1条消息,然后不写了,这个文件如果一直不切换的话,就无法被读到了。
可见,确实是选择一批segment来持久化,这样就把持久化的任务和写线程隔离开来,尽量不占用写的主线程的任务!
===
Log.append()方法通过加锁进行同步控制,因为涉及到多线程操作,多个线程写。
但是在read()方法中并没有加锁操作,在开始查询消息之前会将nextOffsetMetaData字段保存为方法的局部变量,来避免线程安全问题。
相关文章推荐
- Apache Kafka源码剖析:第7篇 日志存储系列2-FileMessageSet
- Apache Kafka源码剖析:第9篇 日志存储系列4-OffsetIndex
- Apache Kafka源码剖析:第6篇 日志存储系列1-基本概念
- WorldWind源码剖析系列:日志类Log
- Memcached源码剖析系列之内存存储机制(一)
- Zookeeper系列(十八)Zookeeper原理解析之数据存储之TxnLog事务日志
- memcached源码剖析系列之内存存储机制(二)
- memcached源码剖析系列之内存存储机制
- Apache Kafka源码剖析:第12篇 延迟操作系列1-DelayedProduce
- Memcached源码剖析系列之内存存储机制(一)
- memcached源码剖析系列之内存存储机制(二)
- memcached源码剖析系列之内存存储机制(三)
- memcached源码剖析系列之内存存储机制(三)
- Memcached源码剖析系列之内存存储机制(一)
- 【库函数源码剖析系列】(3) strstr
- go-ethereum源码剖析:区块存储
- Apache Kafka源码剖析:第4篇 业务线程池的原理
- WorldWind源码剖析系列:角度类Angle
- IronPython 源码剖析系列(1):IronPython 编译器
- C系列总结2 & 剖析整型、浮点型数据在内存中的存储