分区器 Partitioner
在 kafka
里,消息经过序列化之后就需要发往分区器Partitioner
确定发送的分区,当然这是指在ProducerRecord
未指定partition
的情况下。如果没有指定分区,就需要按照其中的key
,通过Partitioner
来进行分区。
此时,如果 key
为 null
,那么将轮询发往各个分区;如果 key
不为 null
,则按照Partitioner
规则进行分区。
Kafka
中常用的默认分区器 Partitioner
是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
,其实现的是 org.apache.kafka.clients.producer.Partitioner
接口。
Partitioner 接口
Partitioner
接口主要有两个方法,partition()
方法用来计算分区,参数主要是 topic
、key
、序列化后的key
、value
、序列化后的value
、集群元数据;close()
关闭分区器,同时回收一些资源。此外,该接口也继承了 Configurable
接口,其中主要的方法是 configure()
,主要用来配置参数
1 | public interface Partitioner extends Configurable, Closeable { |
默认分区器 DefaultPartitioner
直接进代码看具体实现
1 | public class DefaultPartitioner implements Partitioner { |
也就是说,在默认 Partitioner
规则下,当 key
为空时,只会选择可用分区来做轮询(当可用分区为0时才会对总分区取余获取分区号,可用都为0,也就没意义)。当 key
不为空,才会选择所有分区来 hash
,然后取分区号,相同的 key
会得到相同的分区号。
自定义分区器 MyPartitioner
既然如此,先实现一个简单的分区器 Partitioner
:当 key
为 null
时,选择所有分区来做轮询,不为 null
时,hash
值然后获取分区号。
1 | package kafka.partitioner; |
是用自定义分区器
如果需要只用到自定义的分区器,只需要在 Producer
的参数配置中加上对应的分区器配置就OK了。如下:
1 | properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); |