生产者数据链路原理分析
在消息真正发往 Kafka
之前,有可能会经历的有拦截器、序列化器和分区器等一系列操作。那么在这之后,发送的时候又会发生什么呢。
整个生产者由两个线程协调运行。第一个是线程,另一个是 Sender
线程。
主线程
主线程中,KafkaProducer
创建消息,通过拦截器、序列化器和分区器的作用之后,会将消息缓存到消息累加器 RecordAccumulator
中。RecordAccumulator
主要用来缓存消息,以便 Sender
线程可以批量发送消息,进而减少网络开销从而提升性能。RecordAccumulator
缓存的大小可以通过配置 buffer.memory
来配置(默认值是 33554432 即 32MB),同时配合的参数有 max.block.ms
即在这个时间内,如果 RecordAccumulator
内的数据没被发送,则会阻塞 KafkaProducer
,超时则会抛异常。
在 RecordAccumulator
缓存中,针对 每个分区 都维护了一个双端队列,队列中的内容就是 RecordBatch
即 Deque<RecordBatch>
。当消息数据 ProducerRecord
进入到 RecordAccumulator
时,会先根据 ProducerRecord
对应分区的双端队列 Deque
是否存在,不存在则新建一个对应的 Deque
,有则在 Deque
尾部获取一个 RecordBatch
(没有则新建),然后查看该 RecordBatch
是否还能写入此条数据,如果能则写入,如果不能则创建一个新的 RecordBatch
(新建时会判断该条消息是否超出 RecordBatch
大小即 batch.size
大小,如果不超过,就以 batch.size
大小来创建 RecordBatch
(此 RecordBatch
会被下一个消息复用),否则就以消息大小来创建 RecordBatch
,此 RecordBatch
不会被下一个消息复用)。 此外,在 RecordAccumulator
中,还有一个 BufferPool
,用来管理特定大小的 ByteBuffer
即 RecordBatch
。
Sender
线程从 RecordAccumulator
缓存中的双端队列头部获取 RecordBatch
。
Sender线程
Sender 线程获取到 RecordBatch
之后,消息会被保存成不同的形式,即 从 RecordAccumulator
缓存中的 <分区,Deque<ProducerBatch>>
=> <Node,List<ProducerBatch>>
。其中 node
就是 broker
节点,因为一般来说,我们分区数都是节点的倍数。所以一个节点会有多个 partition
,但是客户端发送消息只需要负责发到broker,而不关心具体分区。
在这之后,会再次转换成 <Node, Request>
形式,然后将 Request
发送到具体的 Node
。在发送之前,会先将请求放入另一个缓存区 InFlightRequest
,其中的形式为 Map<NodeId,Deque<Request>>
,作用是缓存了已经发送了但是还没收到响应的请求。此缓存还有一些其他配置。