根据官方脚本:kafka-reassign-partitions.sh ,这里先不研究源码,直接看脚本帮助
1 | [root@localhost bin]# ./kafka-reassign-partitions.sh |
几个重要的参数
三种模式
–generate
给定需要重新分配的Topic,自动生成reassign plan,并不会执行
–execute
根据指定的reassign plan重新分配Partition
–verify
验证重新分配Partition是否成功
两个文件参数
–topics-to-move-json-file
需要进行重新分配的Topic配置文件,格式如下:
1 | {"partitions": |
文件名不重要,保证内容是json格式就行了。
–reassignment-json-file
根据上面的配置文件生成的reassign plan。格式如下,可以自己修改。
1 | {"topics": |
两个集群参数
–zookeeper
zk1:2181,zk2:2181
–broker-list
具体数值为各个broker下面的配置ID,”1,2,3,4,5”。该参数为指定新分配到的broker节点,即,假设原始broker是 “1,2,3,4”,扩容一台,则为”1,2,3,4,5”。
官方方案
- 写好 topics-to-move-json-file 配置文件 tp-file.json
- 使用命令生成 reassign plan即 reassignment-json-file
1 >./kafka-reassign-partitions.sh --zookeeper zk1:2181 --topics-to-move-json-file tp-file.json --broker-list "1,2,3,4" --generate- (可选)根据需求自己定义或修改上面的 reassignment-json-file 文件
- 使用命令触发分配计划执行
1 >./kafka-reassign-partitions.sh --zookeeper zk1:2181 --reassignment-json-file reassignment-json-file.json --execute- 校验分配结果
1 >./kafka-reassign-partitions.sh --zookeeper zk1:2181 --reassignment-json-file reassignment-json-file.json --verify
扩容原理
一般来说,我们Kafka集群上的Topic的Partition数是大于broker数的,所以基本上,一个Topic的partition基本是遍布所有broker的。
所以我们扩容的根本原理就在于,将原来的Partition的副本进行迁移。整个迁移过程可能会涉及到副本leader和follower的迁移。
整体原理如下:
- 将副本数增加一份(无论扩容多少台)
- 新增副本开始从副本leader开始从头开始复制,即从earliest offset开始
- 等新增副本跟随上最新offset,将新增的副本添加到 ISR 列表
- 移除需要移除的broker上的副本
这种方案的缺陷
在数据量大的时候进行扩容是,因为要从头拷贝数据,会造成大量读原磁盘,消耗大量的I/O,造成producer操作缓慢,容易产生抖动。
改良方案
优化点主要在第二步的从earliest offset。假设我们从latest offset开始复制数据,然后等待新副本保持稳定一段时间后,添加到ISR列表,再移除就副本。
具体代码可以看官方的issues,链接:
官方代码:https://issues.apache.org/jira/browse/KAFKA-8328
就官方方案进行测试
查看 topic 信息
1 | [root@localhost bin]# ./kafka-topics.sh --describe --topic FLINK-TOPIC-1 --zookeeper tnode3:2181 |
编辑 topics-to-move-json-file 需要的json格式文件
1 | [root@tnode3 kafka_2.11-0.10.2.1]# cat flink-topic-json |
生成重分配计划 即 reassignment-json-file 所需要的文件
1 | [root@tnode3 kafka_2.11-0.10.2.1]# bin/kafka-reassign-partitions.sh --zookeeper tnode3:2181 --topics-to-move-json-file flink-topic-json --broker-list "0,1,2,3" --generate |
将 Proposed partition reassignment configuration 下面的内容保存到 flink-topic-result.json
执行修改计划
1 | [root@tnode3 kafka_2.11-0.10.2.1]# bin/kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file flink-topic-result.json --execute |
上面有提到。可以再讲 当前的分配情况保存成新的计划文件,用来回滚重分配的操作。
校验重分配是否成功
1 | [root@tnode3 kafka_2.11-0.10.2.1]# bin/kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file flink-topic-result.json --verify |
修改副本顺序 (利用kafka自动生成计划的跳过下面部分,因为自动生成的计划,已经调整了副本顺序)
此时,topic的副本已经迁移完成,但是所有副本的leader还在原来的broker上,我们需要分配一定的leader到新的broker上,所以还需要进行平衡leader。
手动编辑的计划(也可在编辑的时候就调整顺序,此处为只迁移副本的计划,不含调整副本顺序)
1 | {"version":1,"partitions":[{"topic":"WATCH-LOCATION","partition":0, "replicas":[4,2,6]}, |
修改副本顺序,因为副本顺序中,kafka默认第一个为首选leader,经过平衡后会首选使用第一个副本作为leader。topicPartition.json 格式,只是调整了副本顺序
1 | {"version":1,"partitions":[{"topic":"WATCH-LOCATION","partition":0, "replicas":[4,2,6]}, |
以上两步也可一次性编辑完成。
平衡leader
一次性平衡所有Topic:
1 | bin/kafka-preferred-replica-election.sh --zookeeper gs-kafka1:2181 |
或者 添加配置
1 | auto.leader.rebalance.enable=true |
或者平衡单个Topic
1 | bin/kafka-preferred-replica-election.sh --zookeeper gs-kafka1:2181 --path-to-json-file topicPartition.json |