分区器 Partitioner
在 kafka 里,消息经过序列化之后就需要发往分区器Partitioner确定发送的分区,当然这是指在ProducerRecord未指定partition的情况下。如果没有指定分区,就需要按照其中的key,通过Partitioner来进行分区。
此时,如果 key 为 null,那么将轮询发往各个分区;如果 key 不为 null ,则按照Partitioner 规则进行分区。
Kafka 中常用的默认分区器 Partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner ,其实现的是 org.apache.kafka.clients.producer.Partitioner 接口。
Partitioner 接口
Partitioner 接口主要有两个方法,partition() 方法用来计算分区,参数主要是 topic、key、序列化后的key、value、序列化后的value、集群元数据;close() 关闭分区器,同时回收一些资源。此外,该接口也继承了 Configurable 接口,其中主要的方法是 configure(),主要用来配置参数
1 | public interface Partitioner extends Configurable, Closeable { |
默认分区器 DefaultPartitioner
直接进代码看具体实现
1 | public class DefaultPartitioner implements Partitioner { |
也就是说,在默认 Partitioner 规则下,当 key 为空时,只会选择可用分区来做轮询(当可用分区为0时才会对总分区取余获取分区号,可用都为0,也就没意义)。当 key 不为空,才会选择所有分区来 hash,然后取分区号,相同的 key 会得到相同的分区号。
自定义分区器 MyPartitioner
既然如此,先实现一个简单的分区器 Partitioner:当 key 为 null 时,选择所有分区来做轮询,不为 null 时,hash 值然后获取分区号。
1 | package kafka.partitioner; |
是用自定义分区器
如果需要只用到自定义的分区器,只需要在 Producer 的参数配置中加上对应的分区器配置就OK了。如下:
1 | properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); |