日志目录布局
offset:分区中每条消息都会分配一个唯一的序列号
Log:日志,不考虑副本的情况下,一个分区对应一个Log。N副本的情况下,一个分区有N个相同的Log分布在不同broker上
LogSegment:日志分段,防止Log过大,也是便于消息的维护和清理,Log会被切分为多个相对较小的文件,即LogSegment
实际上,Log是以文件夹 (
Topic、Partition、Replication、Log、LogSegment之间的关系,如下
1 | --> Replica |
此外,kafka broker 第一次启动的时候,默认会在kafka-log的每个根目录下创建几个特殊文件:
1 | # 0.10版本下 |
值得注意的是,对于 __consumer_offsets 这个topic,初始化是不会创建的,只有第一次有consumer消费message时才会自动创建这个Topic。此Topic默认50个Partition,Replica 为配置中的 Replica 数。
最后,关于Topic的Partition的落盘问题,创建Topic时,如果broker配置了多个日志根目录,那么会挑选分区数最少的那个根目录来完成Partition的创建,即落盘。
日志索引
上面有提到,每个LogSegment日志文件都有两个对应的索引文件,offset索引文件和时间戳索引文件。offset索引文件用来建立 offset 到物理地址之间的映射,快速定位;时间戳索引文件根据指定的时间戳查找对应offset信息。
两类索引均为稀疏索引 ( sparse index ),不能保证每条message在索引文件中都有对应的索引信息,只有当写入一定量 ( broker 端参数 log.index.interval.bytes
指定,默认值为4096,即4KB ) 消息时 ,索引文件会对应增加offset索引项和时间戳索引项,同时增大或减少这个参数的值,对应可以改变索引项的密度。
LogSegment 达到一定大小时会进行切分,切分条件如下:
- 当前 LogSegment 大小超过了 broker 端参数
log.segment.bytes
配置的值时;默认值为 1073741824,即1GB; - 当前 LogSegment 中消息的最大时间戳与系统当前时间戳差值超过
log.roll.ms
或者log.roll.hours
时;log.roll.ms
优先级更高。默认只配置了log.rool.hours
值为 168 即 7 天; - offset索引文件或者时间戳索引文件大小达到 broker 端参数
log.index.size.max.bytes
的值时,log.index.size.max.bytes
默认值为 10485760,即10MB; - 新 append 的 message 的 offset 与当前 LogSegment 的最大 offset 差值大于 Integer.MAX_VALUE 时,即 append 的 message 的 offset 不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE);
对于索引文件大小问题,非当前活跃的Logsegment而言,因为不会再写入新 message 了,所以其对应的索引文件会被设置为只读,其大小也就是实际占用的空间。对于当前活跃的 LogSegment 而言,在 LogSegment 切分时会以可读写的模式创建新的索引文件,同时预分配 log.index.size.max.bytes
大小的空间,直到索引文件进行切分时才会把该索引文件裁剪到实际数据大小。
偏移量 (offset) 索引
offset索引文件中每行索引项格式都是 relativeOffset + position 相对偏移量
+物理地址
(每个占4B)
根据 offset 查找日志消息(分区内):
根据 offset 定位到 Logsegment :每个 LogSegment 的baseOffset都会作为 key ,保存在对应的跳跃表(
ConcurrentSkipListMap
)里。根据 offset 就可以在跳跃表里通过二分法定位基础的 baseOffset对应的 LogSegment计算相对偏移量:相对偏移量 = offset - baseOffset。
在索引文件中查找物理地址:根基相对偏移量,在索引文件中找到不大于相对偏移量的最大索引项,根据其对应的物理地址 position 开始顺序查找目标消息
1
offset --> LogSegment、baseOffset --> index --> offset - baseOffset = relativeOffset --> position --> LogSegment顺序查找目标消息
另外,kafka要求索引文件大小必须是索引项大小的整数倍。对偏移量索引文件来说,其索引项大小为8,则必须为8的整数倍,如果不是,则为最接近配置的8的整数倍的最大值。例如配置为 75,则会置为 72 。时间戳索引同理,不过时间戳索引项大小是12B。
时间戳索引
和偏移量索引一样,时间戳索引中每行索引项是 timestamp+relativeOffset 当前LogSegment中的最大时间戳+对应消息的物理地址(时间戳占8B,物理地址占4B)
对应每个新append的索引项来说,要求其 timestamp 必须大于之前append的索引项的 timestamp,否则不会 append。决定索引文件中 timestamp 是否单调递增的是broker端参数 log.message.timestamp.type
,设置为 LogAppendTime 时,能够保证;反之,如果是CreateTime类型则无法保证。
当为CreateTIme时,虽然Producer 可以指定 message 的 timestamp ,但是如果多机发往同一个分区,并且每个机器的时间不一样,那么 timestamp 铁定会混乱。
根据时间戳targetTimeStamp查找日志消息 (分区内):
targetTimeStamp与每个 LogSegment 中的 largestTimeStamp对比,直到找到不小于targetTimeStamp的largestTimeStamp所对应的LogSegment。
LogSegment 的 largestTimeStamp计算:查询对应的时间戳索引文件,找到最后一条索引项,如果其timestamp > 0 ,则取值;否则取 LogSegment 的最近修改时间
在对应的时间戳索引文件中通过二分法找到不大于 targetTimeStamp 的最大索引项,取其相对偏移量
在偏移量索引文件中二分法查找不大于 该相对偏移量 的最大索引项,取到对应的物理地址 position
从LogSegment的position位置开始顺序查找不小于 targetTimeStamp 的 message。