日志存储

日志目录布局

offset:分区中每条消息都会分配一个唯一的序列号

Log:日志,不考虑副本的情况下,一个分区对应一个Log。N副本的情况下,一个分区有N个相同的Log分布在不同broker上

LogSegment:日志分段,防止Log过大,也是便于消息的维护和清理,Log会被切分为多个相对较小的文件,即LogSegment

实际上,Log是以文件夹 ( - ) 的形式存在,而LogSegment则对应该目录下的一个日志文件和两个索引文件,以及其他可能的文件,但是其文件名均是日志文件中的第一条消息的 offset。

Topic、Partition、Replication、Log、LogSegment之间的关系,如下

1
2
3
4
5
6
7
8
9
10
                     --> Replica 
--> LogSegment
--> .log 日志文件
--> Partition1 --> Replica --> Log --> LogSegment --> .index 索引文件
--> .timeIndex 时间戳索引文件
--> LogSegment
--> Log
--> Replica
Topic --> Partition2
--> Partition3

此外,kafka broker 第一次启动的时候,默认会在kafka-log的每个根目录下创建几个特殊文件:

1
2
3
4
5
6
7
8
9
10
# 0.10版本下

// 存了每个log的最后清理offset
cleaner-offset-checkpoint
// broker.id 信息
meta.properties
// 表示已经刷写到磁盘的记录,point以下的数据都是已经刷写的
recovery-point-offset-checkpoint
// 每个replica的HighWatermark(HW),已经commit的数据,HW以下的数据所有replica同步
replication-offset-checkpoint

值得注意的是,对于 __consumer_offsets 这个topic,初始化是不会创建的,只有第一次有consumer消费message时才会自动创建这个Topic。此Topic默认50个Partition,Replica 为配置中的 Replica 数。

最后,关于Topic的Partition的落盘问题,创建Topic时,如果broker配置了多个日志根目录,那么会挑选分区数最少的那个根目录来完成Partition的创建,即落盘。

日志索引

上面有提到,每个LogSegment日志文件都有两个对应的索引文件,offset索引文件和时间戳索引文件。offset索引文件用来建立 offset 到物理地址之间的映射,快速定位;时间戳索引文件根据指定的时间戳查找对应offset信息。

两类索引均为稀疏索引 ( sparse index ),不能保证每条message在索引文件中都有对应的索引信息,只有当写入一定量 ( broker 端参数 log.index.interval.bytes 指定,默认值为4096,即4KB ) 消息时 ,索引文件会对应增加offset索引项和时间戳索引项,同时增大或减少这个参数的值,对应可以改变索引项的密度。

LogSegment 达到一定大小时会进行切分,切分条件如下:

  • 当前 LogSegment 大小超过了 broker 端参数 log.segment.bytes 配置的值时;默认值为 1073741824,即1GB;
  • 当前 LogSegment 中消息的最大时间戳与系统当前时间戳差值超过 log.roll.ms 或者 log.roll.hours 时;log.roll.ms 优先级更高。默认只配置了 log.rool.hours 值为 168 即 7 天;
  • offset索引文件或者时间戳索引文件大小达到 broker 端参数 log.index.size.max.bytes 的值时,log.index.size.max.bytes 默认值为 10485760,即10MB;
  • 新 append 的 message 的 offset 与当前 LogSegment 的最大 offset 差值大于 Integer.MAX_VALUE 时,即 append 的 message 的 offset 不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE);

对于索引文件大小问题,非当前活跃的Logsegment而言,因为不会再写入新 message 了,所以其对应的索引文件会被设置为只读,其大小也就是实际占用的空间。对于当前活跃的 LogSegment 而言,在 LogSegment 切分时会以可读写的模式创建新的索引文件,同时预分配 log.index.size.max.bytes 大小的空间,直到索引文件进行切分时才会把该索引文件裁剪到实际数据大小。

偏移量 (offset) 索引

offset索引文件中每行索引项格式都是 relativeOffset + position 相对偏移量+物理地址 (每个占4B)

根据 offset 查找日志消息(分区内):

  1. 根据 offset 定位到 Logsegment :每个 LogSegment 的baseOffset都会作为 key ,保存在对应的跳跃表(ConcurrentSkipListMap)里。根据 offset 就可以在跳跃表里通过二分法定位基础的 baseOffset对应的 LogSegment

  2. 计算相对偏移量:相对偏移量 = offset - baseOffset。

  3. 在索引文件中查找物理地址:根基相对偏移量,在索引文件中找到不大于相对偏移量的最大索引项,根据其对应的物理地址 position 开始顺序查找目标消息

    1
    offset --> LogSegment、baseOffset --> index --> offset - baseOffset = relativeOffset  --> position --> LogSegment顺序查找目标消息

另外,kafka要求索引文件大小必须是索引项大小的整数倍。对偏移量索引文件来说,其索引项大小为8,则必须为8的整数倍,如果不是,则为最接近配置的8的整数倍的最大值。例如配置为 75,则会置为 72 。时间戳索引同理,不过时间戳索引项大小是12B。

时间戳索引

和偏移量索引一样,时间戳索引中每行索引项是 timestamp+relativeOffset 当前LogSegment中的最大时间戳+对应消息的物理地址(时间戳占8B,物理地址占4B)

对应每个新append的索引项来说,要求其 timestamp 必须大于之前append的索引项的 timestamp,否则不会 append。决定索引文件中 timestamp 是否单调递增的是broker端参数 log.message.timestamp.type ,设置为 LogAppendTime 时,能够保证;反之,如果是CreateTime类型则无法保证。

当为CreateTIme时,虽然Producer 可以指定 message 的 timestamp ,但是如果多机发往同一个分区,并且每个机器的时间不一样,那么 timestamp 铁定会混乱。

根据时间戳targetTimeStamp查找日志消息 (分区内):

  1. targetTimeStamp与每个 LogSegment 中的 largestTimeStamp对比,直到找到不小于targetTimeStamp的largestTimeStamp所对应的LogSegment。

    LogSegment 的 largestTimeStamp计算:查询对应的时间戳索引文件,找到最后一条索引项,如果其timestamp > 0 ,则取值;否则取 LogSegment 的最近修改时间

  2. 在对应的时间戳索引文件中通过二分法找到不大于 targetTimeStamp 的最大索引项,取其相对偏移量

  3. 在偏移量索引文件中二分法查找不大于 该相对偏移量 的最大索引项,取到对应的物理地址 position

  4. 从LogSegment的position位置开始顺序查找不小于 targetTimeStamp 的 message。