Topic 管理
kafka-topics.sh
的参数
参数名称 | 解释及其作用 |
---|---|
–alter | 用于修改主题,包括分区数及Topic的配置 |
–config | 创建或修改Topic时,用于设置 Topic 级别的配置 |
–create | 创建 Topic |
–delete | 删除 Topic |
–delete-config | 删除 Topic 级别被覆盖的配置 |
–describe | 查看 Topic 的详细信息 |
–disable-rack-aware | 创建 Topic 时不考虑机架信息 |
–help | 打印帮助信息文档 |
–if-exists | 修改或删除 Topic时,只有 Topic 存在时才会执行操作 |
–if-not-exists | 创建 Topic 时,只有 Topic 不存在才会执行操作 |
–list | 列出所有可用的 Topic |
–partitions | 创建 Topic 或增加分区时 指定分区数 |
–replica-assignment | 手工制定分区副本的分配方案 |
–replication-factor | 创建 Topic 时指定副本数 |
–topic | 指定 Topic 名称 |
–topics-with-overrides | 使用describe查看 Topic 时,只展示包含覆盖配置的 Topic |
–unavailable-partition | 使用describe查看 Topic 时,只展示包含无leader的分区 |
–under-replicated-partitions | 使用describe查看 Topic 时,只展示包含失效副本的分区 |
–zookeeper | 指定连接的zookeeper信息(必填)(zk1:2181/kafka) |
创建 Topic
首先确认一个参数 auto.create.topics.enable=true
,此参数用来自动创建 Topic,也就说当发送消费等操作用到了未创建的 Topic
的时候,此参数会帮忙自动创建一个默认配置的 Topic。强烈建议在 Kafka 配置关闭这个参数。
接下来看看怎么在客户端手动创建一个 Topic
,使用
-topics.sh,指定
Zookeeper、partition、replica` 完成创建即可。
1 | [root@tnode1 bin]# pwd |
查看 Topic
查看所有 Topic
查看所有 Topic
信息
1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe |
查看单个 Topic
信息
1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe --topic LxmTest |
删除 Topic
删除 Topic 分为手动和自动两种方式
- 手动删除 Topic :需要先删除 Zookeeper 上的相关元数据,然后删除各个 Broker 节点上日志目录的下的此 Topic 的分区日志。
- 自动删除 Topic :首先
delete.topic.enable=true
参数必须配置,然后通过 客户端命令删除 Topic,将会自动将元数据和各分区的数据日志删除。如果未配置,将只会删除元数据,不会自动删除日志。
1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --delete --topic LxmTest |
增加分区
一般来说,我们在需要提升较大吞吐的情况下,可以选择增加 partition 。扩展分区命令如下:
1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --topic LxmTest --alter --partitions 3 |
目前分区只能增加,不能减少或者不变的进行修改。
配置管理
配置管理分为 Topic 级别和 Client 级别两种
Topics 级别
1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe |
这里的结果表示没有做单独配置,均使用的集群默认配置。
增加一个配置
1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config flush.messages=2 |
再增加一个配置
1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config retention.ms=259200000 |
一次性添加多个配置,配置用逗号分隔即可
1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config retention.ms=259200000,flush.messages=6 |
修改其中一个配置,其实增加没有区别
1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config flush.messages=5 |
删除一个配置
1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --delete-config retention.ms |
Clients 级别
分区管理
分区平衡
分区平衡分为自动和手动两种:
- 自动:在 server.properties 中配置参数
auto.leader.rebalance.enable = true
,那么集群将会定时进行分区平衡,将 prefer-replica 副本即AR列表第一个副本,置为 leader 副本。 (leader.imbalance.per.broker.percentage = 10
此参数来决定自动平衡的阈值即失衡率超过10%会开始平衡) - 手动:执行平衡命令:
./kafka-preferred-replica-election.sh --zookeeper tnode3:2181
分区迁移
如果我们需要下线一个 broker 节点,那么就需要将该节点上的相关 Topic 的 partition 迁移到其他 broker 节点上去,因为 Kafka 不会自动进行分区迁移的操作,如果不进行手动迁移,就会发生副本实效和数据丢失的情况。同样,如果是我们新上线一个节点,只有新增 Topic 的分区才会分配到新节点,此时也需要进行分区迁移操作。
这个 Topic 目前三个分区分别在 0、1、2 三个 broker 上。
1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe --topic LxmTest |
现在,我们需要下线 broker2,也就是我们需要将 partition 2 迁移到 0,1 broker 上,步骤如下:
先创建一个分区迁移的配置文件,这里叫
topic-move.json
1
2
3
4
5
6
7
8
9{
"topics":
[
{
"topic":"LxmTest"
}
],
"version":1
}然后执行命令生成迁移计划
1
2
3
4
5
6[root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --topics-to-move-json-file topic-move.json --broker-list "0,1" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[2,1]}, {"topic":"LxmTest","partition":0,"replicas":[0,2]}, {"topic":"LxmTest","partition":1,"replicas":[1,0]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[1,0]}, {"topic":"LxmTest","partition":0,"replicas":[1,0]}, {"topic":"LxmTest","partition":1,"replicas":[0,1]}]}然后将上面的
Proposed partition reassignment configuration
下面的内容复制到topic-repartition.json
文件中执行迁移计划
1
2
3
4
5
6
7
8[root@tnode1 bin]# vi topic-repartition.json
[root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[2,1]}, {"topic":"LxmTest","partition":0,"replicas":[0,2]}, {"topic":"LxmTest","partition":1,"replicas":[1,0]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.验证迁移是否成功
1
2
3
4
5[root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --verify
Status of partition reassignment:
Reassignment of partition [LxmTest,2] completed successfully
Reassignment of partition [LxmTest,0] completed successfully
Reassignment of partition [LxmTest,1] completed successfully再次查看 topic 信息
1
2
3
4
5[root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe --topic LxmTest
Topic:LxmTest PartitionCount:3 ReplicationFactor:2 Configs:retention.ms=259200000,flush.messages=6
Topic: LxmTest Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
Topic: LxmTest Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: LxmTest Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0发现已经没有 broker 2 节点的分区了
集群扩容
集群扩容很好理解,就是Kafka磁盘容量不足了,需要增加 broker 节点,然后迁移分区。和上面的 broker下线操作类似,只不是生成迁移计划的时候,增加 broker 而已。具体代码就不贴了。
总得来说,其实集群迁移,无论是 broker 下线 还是 集群扩容 或者是 增加副本,我们都只要手动梳理迁移计划,尽量将各个分区平衡分配,同时将 AR 列表第一个副本 (即默认的 leader 副本) 平均分配,然后进行迁移即可。
!## !大量 topic 进行迁移时,最好分批进行,避免影响正常业务。另外,可以限制迁移的流量。
1 | [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --execute --throttle 1024 |
!## !如果做了流量限制,需要使用 –verify 来检查是否完成迁移,在完成时,–verify 参数会解除限流
kafka设置某个topic的数据过期时间
全局设置
修改 server.properties
1 | log.retention.hours=72 |
单独对某一个topic设置过期时间
但如果只有某一个topic数据量过大。想单独对这个topic的过期时间设置短点:
1 | ./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000 |
retention.ms = 86400000 为一天,单位是毫秒。
查看设置:
1 | ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name mytopic --entity-type topics |
立即删除某个topic下的数据
1 | ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config cleanup.policy=delete |