生产者分区器

分区器 Partitioner

kafka 里,消息经过序列化之后就需要发往分区器Partitioner确定发送的分区,当然这是指在ProducerRecord未指定partition的情况下。如果没有指定分区,就需要按照其中的key,通过Partitioner来进行分区。

此时,如果 keynull,那么将轮询发往各个分区;如果 key 不为 null ,则按照Partitioner 规则进行分区。

Kafka 中常用的默认分区器 Partitionerorg.apache.kafka.clients.producer.internals.DefaultPartitioner ,其实现的是 org.apache.kafka.clients.producer.Partitioner 接口。

Partitioner 接口

Partitioner 接口主要有两个方法,partition() 方法用来计算分区,参数主要是 topickey、序列化后的keyvalue、序列化后的value、集群元数据;close() 关闭分区器,同时回收一些资源。此外,该接口也继承了 Configurable 接口,其中主要的方法是 configure(),主要用来配置参数

1
2
3
4
5
6
public interface Partitioner extends Configurable, Closeable {

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

public void close();
}
默认分区器 DefaultPartitioner

直接进代码看具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class DefaultPartitioner implements Partitioner {

private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {}
// partition 方法。参数主要是topic、key、序列化后的key、value、序列化后的value、集群元数据信息
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 从集群元数据中获取 topic对应的分区信息。其中 PartitionInfo 属性包含 topic、partition、leader、replicas、ISR列表等待。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 分区数
int numPartitions = partitions.size();
// 根据 key 来做分区。key为 null 和 不为 null 分开处理。
// 如果 key 为空
if (keyBytes == null) {
// 获取此条数据需要对应的随机数
int nextValue = nextValue(topic);
// 获取集群可用分区信息列表
// 可用分区,即 分区 leader 不为 null 的分区。
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
// 如果集群可用分区数大于0,取 正值,然后除可用分区数取余,即 hash。余数只会是 0 到 分区数-1 之间的数
int part = Utils.toPositive(nextValue) % availablePartitions.size();
// 拿计算得到的余数去可用分区信息列表获取到对应的分区号
return availablePartitions.get(part).partition();
} else {
// 如果可用分区数不大于0,则直接取绝对值 对总分区数取余 作为分区号 返回
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 如果key不为null,则先hash一个值,然后对 总分区数取余 作为分区号 返回
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

private int nextValue(String topic) {
// 从Map获取topic对应的 AtomicInteger,第一次必为 null
AtomicInteger counter = topicCounterMap.get(topic);
// 如果是第一次,counter 必为 null
if (null == counter) {
// 随机一个数,初始化 AtomicInteger
counter = new AtomicInteger(new Random().nextInt());
// 往Map里插入初始化好的 topic AtomicInteger ,然后 get 出来
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
// 当 上面 get 到的值不为空时,替换开头的 counter
counter = currentCounter;
}
}
// 返回 counter(topic对应的AtomicInteger)+1 的值
return counter.getAndIncrement();
}

public void close() {}
}

也就是说,在默认 Partitioner 规则下,当 key 为空时,只会选择可用分区来做轮询(当可用分区为0时才会对总分区取余获取分区号,可用都为0,也就没意义)。当 key 不为空,才会选择所有分区来 hash,然后取分区号,相同的 key 会得到相同的分区号。

自定义分区器 MyPartitioner

既然如此,先实现一个简单的分区器 Partitioner:当 keynull 时,选择所有分区来做轮询,不为 null 时,hash 值然后获取分区号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MyPartitioner implements Partitioner {

private final ConcurrentHashMap<String,AtomicInteger> tpCounterMap = new ConcurrentHashMap<>();

@Override
public void configure(Map<String, ?> configs) {
// 配置信息
}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);

int numPartitions = partitionInfos.size();
int nextvalue = nextvalue(topic);

// 如果 key 为空,则轮询所有分区。不为空,hash获取到所有分区中的对应的分区号
if (keyBytes == null)
return Utils.toPositive(nextvalue) % numPartitions;
else
return Utils.toPositive(Utils.murmur2(keyBytes)) & numPartitions;

}

private int nextvalue(String topic){

AtomicInteger atomicInteger = tpCounterMap.get(topic);

if (atomicInteger == null){
atomicInteger = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCount = tpCounterMap.putIfAbsent(topic, atomicInteger);
if (currentCount != null){
atomicInteger = currentCount;
}
}
return atomicInteger.getAndIncrement();
}

@Override
public void close() {
// 空方法
}
}
是用自定义分区器

如果需要只用到自定义的分区器,只需要在 Producer 的参数配置中加上对应的分区器配置就OK了。如下:

1
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());