Kafka集群扩容方案研究

根据官方脚本:kafka-reassign-partitions.sh ,这里先不研究源码,直接看脚本帮助
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
[root@localhost bin]# ./kafka-reassign-partitions.sh 
This command moves topic partitions between replicas.
Option Description
------ -----------
--broker-list <String: brokerlist> The list of brokers to which the
partitions need to be reassigned in
the form "0,1,2". This is required
if --topics-to-move-json-file is
used to generate reassignment
configuration
--disable-rack-aware Disable rack aware replica assignment
--execute Kick off the reassignment as specified
by the --reassignment-json-file
option.
--generate Generate a candidate partition
reassignment configuration. Note
that this only generates a candidate
assignment, it does not execute it.
--reassignment-json-file <String: The JSON file with the partition
manual assignment json file path> reassignment configurationThe format
to use is -
{"partitions":
[{"topic": "foo",
"partition": 1,
"replicas": [1,2,3] }],
"version":1
}
--throttle <Long: throttle> The movement of partitions will be
throttled to this value (bytes/sec).
Rerunning with this option, whilst a
rebalance is in progress, will alter
the throttle value. The throttle
rate should be at least 1 KB/s.
(default: -1)
--topics-to-move-json-file <String: Generate a reassignment configuration
topics to reassign json file path> to move the partitions of the
specified topics to the list of
brokers specified by the --broker-
list option. The format to use is -
{"topics":
[{"topic": "foo"},{"topic": "foo1"}],
"version":1
}
--verify Verify if the reassignment completed
as specified by the --reassignment-
json-file option. If there is a
throttle engaged for the replicas
specified, and the rebalance has
completed, the throttle will be
removed
--zookeeper <String: urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.

几个重要的参数

三种模式

–generate

给定需要重新分配的Topic,自动生成reassign plan,并不会执行

–execute

根据指定的reassign plan重新分配Partition

–verify

验证重新分配Partition是否成功

两个文件参数

–topics-to-move-json-file

需要进行重新分配的Topic配置文件,格式如下:

1
2
3
4
5
6
{"partitions":                        
[{"topic": "foo",
"partition": 1,
"replicas": [1,2,3] }],
"version":1
}

文件名不重要,保证内容是json格式就行了。

–reassignment-json-file

根据上面的配置文件生成的reassign plan。格式如下,可以自己修改。

1
2
3
4
{"topics":                            
[{"topic": "foo"},{"topic": "foo1"}],
"version":1
}

两个集群参数

–zookeeper

zk1:2181,zk2:2181

–broker-list

具体数值为各个broker下面的配置ID,”1,2,3,4,5”。该参数为指定新分配到的broker节点,即,假设原始broker是 “1,2,3,4”,扩容一台,则为”1,2,3,4,5”。

官方方案

  • 写好 topics-to-move-json-file 配置文件 tp-file.json
  • 使用命令生成 reassign plan即 reassignment-json-file
    1
    >./kafka-reassign-partitions.sh --zookeeper zk1:2181 --topics-to-move-json-file tp-file.json --broker-list "1,2,3,4" --generate
  • (可选)根据需求自己定义或修改上面的 reassignment-json-file 文件
  • 使用命令触发分配计划执行
    1
    >./kafka-reassign-partitions.sh --zookeeper zk1:2181 --reassignment-json-file reassignment-json-file.json --execute
  • 校验分配结果
    1
    >./kafka-reassign-partitions.sh --zookeeper zk1:2181 --reassignment-json-file reassignment-json-file.json --verify

扩容原理

一般来说,我们Kafka集群上的Topic的Partition数是大于broker数的,所以基本上,一个Topic的partition基本是遍布所有broker的。
所以我们扩容的根本原理就在于,将原来的Partition的副本进行迁移。整个迁移过程可能会涉及到副本leader和follower的迁移。

整体原理如下:
  1. 将副本数增加一份(无论扩容多少台)
  2. 新增副本开始从副本leader开始从头开始复制,即从earliest offset开始
  3. 等新增副本跟随上最新offset,将新增的副本添加到 ISR 列表
  4. 移除需要移除的broker上的副本
这种方案的缺陷

在数据量大的时候进行扩容是,因为要从头拷贝数据,会造成大量读原磁盘,消耗大量的I/O,造成producer操作缓慢,容易产生抖动。

改良方案

优化点主要在第二步的从earliest offset。假设我们从latest offset开始复制数据,然后等待新副本保持稳定一段时间后,添加到ISR列表,再移除就副本。
具体代码可以看官方的issues,链接:
官方代码:https://issues.apache.org/jira/browse/KAFKA-8328

就官方方案进行测试

查看 topic 信息
1
2
3
4
5
6
[root@localhost bin]# ./kafka-topics.sh --describe --topic FLINK-TOPIC-1  --zookeeper tnode3:2181
Topic:FLINK-TOPIC-1 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: FLINK-TOPIC-1 Partition: 0 Leader: 0 Replicas: 0,2 Isr: 2,0
Topic: FLINK-TOPIC-1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: FLINK-TOPIC-1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: FLINK-TOPIC-1 Partition: 3 Leader: 0 Replicas: 0,1 Isr: 1,0
编辑 topics-to-move-json-file 需要的json格式文件
1
2
3
4
[root@tnode3 kafka_2.11-0.10.2.1]# cat flink-topic-json 
{"topics": [{"topic": "FLINK-TOPIC-1"}],
"version":1
}
生成重分配计划 即 reassignment-json-file 所需要的文件
1
2
3
4
5
6
7
8
[root@tnode3 kafka_2.11-0.10.2.1]# bin/kafka-reassign-partitions.sh --zookeeper tnode3:2181 --topics-to-move-json-file flink-topic-json --broker-list "0,1,2,3" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"FLINK-TOPIC-1","partition":3,"replicas":[0,1]}, {"topic":"FLINK-TOPIC-1","partition":2,"replicas":[2,1]}, {"topic":"FLINK-TOPIC-1","partition":1,"replicas":[1,0]}, {"topic":"FLINK-TOPIC-1","partition":0,"replicas":[0,2]}
]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"FLINK-TOPIC-1","partition":3,"replicas":[2,3]}, {"topic":"FLINK-TOPIC-1","partition":2,"replicas":[1,2]}, {"topic":"FLINK-TOPIC-1","partition":1,"replicas":[0,1]}, {"topic":"FLINK-TOPIC-1","partition":0,"replicas":[3,0]}
]}

将 Proposed partition reassignment configuration 下面的内容保存到 flink-topic-result.json

执行修改计划
1
2
3
4
5
6
7
8
[root@tnode3 kafka_2.11-0.10.2.1]# bin/kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file flink-topic-result.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"FLINK-TOPIC-1","partition":3,"replicas":[0,1]}, {"topic":"FLINK-TOPIC-1","partition":2,"replicas":[2,1]}, {"topic":"FLINK-TOPIC-1","partition":1,"replicas":[1,0]}, {"topic":"FLINK-TOPIC-1","partition":0,"replicas":[0,2]}
]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

上面有提到。可以再讲 当前的分配情况保存成新的计划文件,用来回滚重分配的操作。

校验重分配是否成功
1
2
3
4
5
6
[root@tnode3 kafka_2.11-0.10.2.1]# bin/kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file flink-topic-result.json --verify
Status of partition reassignment:
Reassignment of partition [FLINK-TOPIC-1,3] completed successfully
Reassignment of partition [FLINK-TOPIC-1,2] completed successfully
Reassignment of partition [FLINK-TOPIC-1,1] completed successfully
Reassignment of partition [FLINK-TOPIC-1,0] completed successfully
修改副本顺序 (利用kafka自动生成计划的跳过下面部分,因为自动生成的计划,已经调整了副本顺序)

此时,topic的副本已经迁移完成,但是所有副本的leader还在原来的broker上,我们需要分配一定的leader到新的broker上,所以还需要进行平衡leader。

手动编辑的计划(也可在编辑的时候就调整顺序,此处为只迁移副本的计划,不含调整副本顺序)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{"version":1,"partitions":[{"topic":"WATCH-LOCATION","partition":0, "replicas":[4,2,6]},
{"topic":"WATCH-LOCATION","partition":1, "replicas":[5,3,7]},
{"topic":"WATCH-LOCATION","partition":2, "replicas":[0,4,8]},
{"topic":"WATCH-LOCATION","partition":3, "replicas":[1,5,6]},
{"topic":"WATCH-LOCATION","partition":4, "replicas":[2,0,7]},
{"topic":"WATCH-LOCATION","partition":5, "replicas":[3,1,8]},
{"topic":"WATCH-LOCATION","partition":6, "replicas":[4,3,6]},
{"topic":"WATCH-LOCATION","partition":7, "replicas":[5,4,7]},
{"topic":"WATCH-LOCATION","partition":8, "replicas":[0,5,8]},
{"topic":"WATCH-LOCATION","partition":9, "replicas":[1,0,6]},
{"topic":"WATCH-LOCATION","partition":10,"replicas":[2,1,7]},
{"topic":"WATCH-LOCATION","partition":11,"replicas":[3,2,8]},
{"topic":"WATCH-LOCATION","partition":12,"replicas":[4,5,6]},
{"topic":"WATCH-LOCATION","partition":13,"replicas":[5,0,7]},
{"topic":"WATCH-LOCATION","partition":14,"replicas":[0,1,8]},
{"topic":"WATCH-LOCATION","partition":15,"replicas":[1,2,6]},
{"topic":"WATCH-LOCATION","partition":16,"replicas":[2,3,7]},
{"topic":"WATCH-LOCATION","partition":17,"replicas":[3,4,8]}
]}

修改副本顺序,因为副本顺序中,kafka默认第一个为首选leader,经过平衡后会首选使用第一个副本作为leader。topicPartition.json 格式,只是调整了副本顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{"version":1,"partitions":[{"topic":"WATCH-LOCATION","partition":0, "replicas":[4,2,6]},
{"topic":"WATCH-LOCATION","partition":1, "replicas":[5,3,7]},
{"topic":"WATCH-LOCATION","partition":2, "replicas":[0,4,8]},
{"topic":"WATCH-LOCATION","partition":3, "replicas":[1,5,6]},
{"topic":"WATCH-LOCATION","partition":4, "replicas":[2,0,7]},
{"topic":"WATCH-LOCATION","partition":5, "replicas":[3,1,8]},
{"topic":"WATCH-LOCATION","partition":6, "replicas":[4,3,6]},
{"topic":"WATCH-LOCATION","partition":7, "replicas":[5,4,7]},
{"topic":"WATCH-LOCATION","partition":8, "replicas":[0,5,8]},
{"topic":"WATCH-LOCATION","partition":9, "replicas":[6,0,1]},
{"topic":"WATCH-LOCATION","partition":10,"replicas":[7,1,2]},
{"topic":"WATCH-LOCATION","partition":11,"replicas":[8,2,3]},
{"topic":"WATCH-LOCATION","partition":12,"replicas":[6,5,4]},
{"topic":"WATCH-LOCATION","partition":13,"replicas":[7,0,5]},
{"topic":"WATCH-LOCATION","partition":14,"replicas":[8,1,0]},
{"topic":"WATCH-LOCATION","partition":15,"replicas":[1,2,6]},
{"topic":"WATCH-LOCATION","partition":16,"replicas":[2,3,7]},
{"topic":"WATCH-LOCATION","partition":17,"replicas":[3,4,8]}
]}

以上两步也可一次性编辑完成。

平衡leader

一次性平衡所有Topic:

1
bin/kafka-preferred-replica-election.sh --zookeeper gs-kafka1:2181

或者 添加配置

1
auto.leader.rebalance.enable=true

或者平衡单个Topic

1
bin/kafka-preferred-replica-election.sh --zookeeper gs-kafka1:2181 --path-to-json-file topicPartition.json