Topic 管理
kafka-topics.sh  的参数
| 参数名称 | 解释及其作用 | 
|---|---|
| –alter | 用于修改主题,包括分区数及Topic的配置 | 
| –config | 创建或修改Topic时,用于设置 Topic 级别的配置 | 
| –create | 创建 Topic | 
| –delete | 删除 Topic | 
| –delete-config | 删除 Topic 级别被覆盖的配置 | 
| –describe | 查看 Topic 的详细信息 | 
| –disable-rack-aware | 创建 Topic 时不考虑机架信息 | 
| –help | 打印帮助信息文档 | 
| –if-exists | 修改或删除 Topic时,只有 Topic 存在时才会执行操作 | 
| –if-not-exists | 创建 Topic 时,只有 Topic 不存在才会执行操作 | 
| –list | 列出所有可用的 Topic | 
| –partitions | 创建 Topic 或增加分区时 指定分区数 | 
| –replica-assignment | 手工制定分区副本的分配方案 | 
| –replication-factor | 创建 Topic 时指定副本数 | 
| –topic | 指定 Topic 名称 | 
| –topics-with-overrides | 使用describe查看 Topic 时,只展示包含覆盖配置的 Topic | 
| –unavailable-partition | 使用describe查看 Topic 时,只展示包含无leader的分区 | 
| –under-replicated-partitions | 使用describe查看 Topic 时,只展示包含失效副本的分区 | 
| –zookeeper | 指定连接的zookeeper信息(必填)(zk1:2181/kafka) | 
创建 Topic
首先确认一个参数 auto.create.topics.enable=true ,此参数用来自动创建 Topic,也就说当发送消费等操作用到了未创建的 Topic 的时候,此参数会帮忙自动创建一个默认配置的 Topic。强烈建议在 Kafka 配置关闭这个参数。
 接下来看看怎么在客户端手动创建一个 Topic,使用  
-topics.sh,指定Zookeeper、partition、replica`  完成创建即可。
| 1 | [root@tnode1 bin]# pwd | 
查看 Topic
查看所有 Topic
查看所有 Topic 信息
| 1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe | 
查看单个 Topic 信息
| 1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe --topic LxmTest | 
删除 Topic
删除 Topic 分为手动和自动两种方式
- 手动删除 Topic :需要先删除 Zookeeper 上的相关元数据,然后删除各个 Broker 节点上日志目录的下的此 Topic 的分区日志。
- 自动删除 Topic :首先 delete.topic.enable=true参数必须配置,然后通过 客户端命令删除 Topic,将会自动将元数据和各分区的数据日志删除。如果未配置,将只会删除元数据,不会自动删除日志。
| 1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --delete --topic LxmTest | 
增加分区
一般来说,我们在需要提升较大吞吐的情况下,可以选择增加 partition 。扩展分区命令如下:
| 1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --topic LxmTest --alter --partitions 3 | 
目前分区只能增加,不能减少或者不变的进行修改。
配置管理
配置管理分为 Topic 级别和 Client 级别两种
Topics 级别
| 1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe | 
这里的结果表示没有做单独配置,均使用的集群默认配置。
增加一个配置
| 1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config flush.messages=2 | 
再增加一个配置
| 1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config retention.ms=259200000 | 
一次性添加多个配置,配置用逗号分隔即可
| 1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config retention.ms=259200000,flush.messages=6 | 
修改其中一个配置,其实增加没有区别
| 1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config flush.messages=5 | 
删除一个配置
| 1 | [root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --delete-config retention.ms | 
Clients 级别
分区管理
分区平衡
分区平衡分为自动和手动两种:
- 自动:在 server.properties 中配置参数 auto.leader.rebalance.enable = true,那么集群将会定时进行分区平衡,将 prefer-replica 副本即AR列表第一个副本,置为 leader 副本。 (leader.imbalance.per.broker.percentage = 10此参数来决定自动平衡的阈值即失衡率超过10%会开始平衡)
- 手动:执行平衡命令:./kafka-preferred-replica-election.sh --zookeeper tnode3:2181
分区迁移
如果我们需要下线一个 broker 节点,那么就需要将该节点上的相关 Topic 的 partition 迁移到其他 broker 节点上去,因为 Kafka 不会自动进行分区迁移的操作,如果不进行手动迁移,就会发生副本实效和数据丢失的情况。同样,如果是我们新上线一个节点,只有新增 Topic 的分区才会分配到新节点,此时也需要进行分区迁移操作。
这个 Topic 目前三个分区分别在 0、1、2 三个 broker 上。
| 1 | [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe --topic LxmTest | 
现在,我们需要下线 broker2,也就是我们需要将 partition 2 迁移到 0,1 broker 上,步骤如下:
- 先创建一个分区迁移的配置文件,这里叫 - topic-move.json- 1 
 2
 3
 4
 5
 6
 7
 8
 9- { 
 "topics":
 [
 {
 "topic":"LxmTest"
 }
 ],
 "version":1
 }
- 然后执行命令生成迁移计划 - 1 
 2
 3
 4
 5
 6- [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --topics-to-move-json-file topic-move.json --broker-list "0,1" --generate 
 Current partition replica assignment
 {"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[2,1]}, {"topic":"LxmTest","partition":0,"replicas":[0,2]}, {"topic":"LxmTest","partition":1,"replicas":[1,0]}]}
 Proposed partition reassignment configuration
 {"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[1,0]}, {"topic":"LxmTest","partition":0,"replicas":[1,0]}, {"topic":"LxmTest","partition":1,"replicas":[0,1]}]}
- 然后将上面的 - Proposed partition reassignment configuration下面的内容复制到- topic-repartition.json文件中
- 执行迁移计划 - 1 
 2
 3
 4
 5
 6
 7
 8- [root@tnode1 bin]# vi topic-repartition.json 
 [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --execute
 Current partition replica assignment
 {"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[2,1]}, {"topic":"LxmTest","partition":0,"replicas":[0,2]}, {"topic":"LxmTest","partition":1,"replicas":[1,0]}]}
 Save this to use as the --reassignment-json-file option during rollback
 Successfully started reassignment of partitions.
- 验证迁移是否成功 - 1 
 2
 3
 4
 5- [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --verify 
 Status of partition reassignment:
 Reassignment of partition [LxmTest,2] completed successfully
 Reassignment of partition [LxmTest,0] completed successfully
 Reassignment of partition [LxmTest,1] completed successfully
- 再次查看 topic 信息 - 1 
 2
 3
 4
 5- [root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --describe --topic LxmTest 
 Topic:LxmTest PartitionCount:3 ReplicationFactor:2 Configs:retention.ms=259200000,flush.messages=6
 Topic: LxmTest Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
 Topic: LxmTest Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
 Topic: LxmTest Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0- 发现已经没有 broker 2 节点的分区了 
集群扩容
集群扩容很好理解,就是Kafka磁盘容量不足了,需要增加 broker 节点,然后迁移分区。和上面的 broker下线操作类似,只不是生成迁移计划的时候,增加 broker 而已。具体代码就不贴了。
总得来说,其实集群迁移,无论是 broker 下线 还是 集群扩容 或者是 增加副本,我们都只要手动梳理迁移计划,尽量将各个分区平衡分配,同时将 AR 列表第一个副本 (即默认的 leader 副本) 平均分配,然后进行迁移即可。
!## !大量 topic 进行迁移时,最好分批进行,避免影响正常业务。另外,可以限制迁移的流量。
| 1 | [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --execute --throttle 1024 | 
!## !如果做了流量限制,需要使用 –verify 来检查是否完成迁移,在完成时,–verify 参数会解除限流
kafka设置某个topic的数据过期时间
全局设置
修改 server.properties
| 1 | log.retention.hours=72 | 
单独对某一个topic设置过期时间
但如果只有某一个topic数据量过大。想单独对这个topic的过期时间设置短点:
| 1 | ./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000 | 
retention.ms = 86400000 为一天,单位是毫秒。
查看设置:
| 1 | ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name mytopic --entity-type topics | 
立即删除某个topic下的数据
| 1 | ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config cleanup.policy=delete |