Kafka客户端命令操作

Topic 管理

kafka-topics.sh 的参数

参数名称 解释及其作用
–alter 用于修改主题,包括分区数及Topic的配置
–config 创建或修改Topic时,用于设置 Topic 级别的配置
–create 创建 Topic
–delete 删除 Topic
–delete-config 删除 Topic 级别被覆盖的配置
–describe 查看 Topic 的详细信息
–disable-rack-aware 创建 Topic 时不考虑机架信息
–help 打印帮助信息文档
–if-exists 修改或删除 Topic时,只有 Topic 存在时才会执行操作
–if-not-exists 创建 Topic 时,只有 Topic 不存在才会执行操作
–list 列出所有可用的 Topic
–partitions 创建 Topic 或增加分区时 指定分区数
–replica-assignment 手工制定分区副本的分配方案
–replication-factor 创建 Topic 时指定副本数
–topic 指定 Topic 名称
–topics-with-overrides 使用describe查看 Topic 时,只展示包含覆盖配置的 Topic
–unavailable-partition 使用describe查看 Topic 时,只展示包含无leader的分区
–under-replicated-partitions 使用describe查看 Topic 时,只展示包含失效副本的分区
–zookeeper 指定连接的zookeeper信息(必填)(zk1:2181/kafka)
创建 Topic

首先确认一个参数 auto.create.topics.enable=true ,此参数用来自动创建 Topic,也就说当发送消费等操作用到了未创建的 Topic 的时候,此参数会帮忙自动创建一个默认配置的 Topic。强烈建议在 Kafka 配置关闭这个参数。

接下来看看怎么在客户端手动创建一个 Topic,使用

-topics.sh,指定Zookeeper、partition、replica` 完成创建即可。

1
2
3
4
[root@tnode1 bin]# pwd
/data/kafka/kafka_2.11-0.10.2.1/bin
[root@tnode1 bin]# ./kafka-topics.sh --create --zookeeper tnode3:2181 --replication-factor 2 --partitions 3 --topic LxmTest
Created topic "LxmTest".
查看 Topic

查看所有 Topic

查看所有 Topic 信息

1
2
3
4
5
6
7
[root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181  --describe
Topic:LxmTest PartitionCount:3 ReplicationFactor:2 Configs:
Topic: LxmTest Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: LxmTest Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: LxmTest Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
...
(其他 topic 信息省略)

查看单个 Topic 信息

1
2
3
4
5
[root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181  --describe --topic LxmTest
Topic:LxmTest PartitionCount:3 ReplicationFactor:2 Configs:
Topic: LxmTest Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: LxmTest Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: LxmTest Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
删除 Topic

删除 Topic 分为手动和自动两种方式

  • 手动删除 Topic :需要先删除 Zookeeper 上的相关元数据,然后删除各个 Broker 节点上日志目录的下的此 Topic 的分区日志。
  • 自动删除 Topic :首先 delete.topic.enable=true 参数必须配置,然后通过 客户端命令删除 Topic,将会自动将元数据和各分区的数据日志删除。如果未配置,将只会删除元数据,不会自动删除日志。
1
2
3
[root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181  --delete --topic LxmTest
Topic LxmTest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
增加分区

一般来说,我们在需要提升较大吞吐的情况下,可以选择增加 partition 。扩展分区命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --topic LxmTest  --alter --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased
[2019-08-23 08:42:20,176] ERROR kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased
at kafka.admin.AdminUtils$.addPartitions(AdminUtils.scala:271)
at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:145)
at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:122)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:122)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
[root@tnode1 bin]# ./kafka-topics.sh --zookeeper tnode3:2181 --topic LxmTest --alter --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

目前分区只能增加,不能减少或者不变的进行修改。

配置管理

配置管理分为 Topic 级别和 Client 级别两种

Topics 级别
1
2
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe
Configs for topic 'LxmTest' are

这里的结果表示没有做单独配置,均使用的集群默认配置。

增加一个配置

1
2
3
4
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config flush.messages=2
Completed Updating config for entity: topic 'LxmTest'.
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe
Configs for topic 'LxmTest' are flush.messages=2

再增加一个配置

1
2
3
4
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config retention.ms=259200000
Completed Updating config for entity: topic 'LxmTest'.
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe
Configs for topic 'LxmTest' are retention.ms=259200000,flush.messages=2

一次性添加多个配置,配置用逗号分隔即可

1
2
3
4
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config retention.ms=259200000,flush.messages=6
Completed Updating config for entity: topic 'LxmTest'.
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe
Configs for topic 'LxmTest' are retention.ms=259200000,flush.messages=6

修改其中一个配置,其实增加没有区别

1
2
3
4
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --add-config flush.messages=5
Completed Updating config for entity: topic 'LxmTest'.
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe
Configs for topic 'LxmTest' are retention.ms=259200000,flush.messages=5

删除一个配置

1
2
3
4
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --alter --delete-config retention.ms
Completed Updating config for entity: topic 'LxmTest'.
[root@tnode1 bin]# ./kafka-configs.sh --zookeeper tnode3:2181 --entity-type topics --entity-name LxmTest --describe
Configs for topic 'LxmTest' are flush.messages=5
Clients 级别

分区管理

分区平衡

分区平衡分为自动和手动两种:

  • 自动:在 server.properties 中配置参数 auto.leader.rebalance.enable = true ,那么集群将会定时进行分区平衡,将 prefer-replica 副本即AR列表第一个副本,置为 leader 副本。 (leader.imbalance.per.broker.percentage = 10 此参数来决定自动平衡的阈值即失衡率超过10%会开始平衡)
  • 手动:执行平衡命令:./kafka-preferred-replica-election.sh --zookeeper tnode3:2181
分区迁移

如果我们需要下线一个 broker 节点,那么就需要将该节点上的相关 Topic 的 partition 迁移到其他 broker 节点上去,因为 Kafka 不会自动进行分区迁移的操作,如果不进行手动迁移,就会发生副本实效和数据丢失的情况。同样,如果是我们新上线一个节点,只有新增 Topic 的分区才会分配到新节点,此时也需要进行分区迁移操作。

这个 Topic 目前三个分区分别在 0、1、2 三个 broker 上。

1
2
3
4
5
[root@tnode1 bin]# ./kafka-topics.sh  --zookeeper tnode3:2181 --describe --topic LxmTest
Topic:LxmTest PartitionCount:3 ReplicationFactor:2 Configs:retention.ms=259200000,flush.messages=6
Topic: LxmTest Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: LxmTest Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: LxmTest Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1

现在,我们需要下线 broker2,也就是我们需要将 partition 2 迁移到 0,1 broker 上,步骤如下:

  1. 先创建一个分区迁移的配置文件,这里叫 topic-move.json

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {
    "topics":
    [
    {
    "topic":"LxmTest"
    }
    ],
    "version":1
    }
  2. 然后执行命令生成迁移计划

    1
    2
    3
    4
    5
    6
    [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --topics-to-move-json-file topic-move.json --broker-list "0,1" --generate
    Current partition replica assignment
    {"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[2,1]}, {"topic":"LxmTest","partition":0,"replicas":[0,2]}, {"topic":"LxmTest","partition":1,"replicas":[1,0]}]}

    Proposed partition reassignment configuration
    {"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[1,0]}, {"topic":"LxmTest","partition":0,"replicas":[1,0]}, {"topic":"LxmTest","partition":1,"replicas":[0,1]}]}
  3. 然后将上面的 Proposed partition reassignment configuration 下面的内容复制到 topic-repartition.json 文件中

  4. 执行迁移计划

    1
    2
    3
    4
    5
    6
    7
    8
    [root@tnode1 bin]# vi topic-repartition.json
    [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --execute
    Current partition replica assignment

    {"version":1,"partitions":[{"topic":"LxmTest","partition":2,"replicas":[2,1]}, {"topic":"LxmTest","partition":0,"replicas":[0,2]}, {"topic":"LxmTest","partition":1,"replicas":[1,0]}]}

    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.
  5. 验证迁移是否成功

    1
    2
    3
    4
    5
    [root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --verify
    Status of partition reassignment:
    Reassignment of partition [LxmTest,2] completed successfully
    Reassignment of partition [LxmTest,0] completed successfully
    Reassignment of partition [LxmTest,1] completed successfully
  6. 再次查看 topic 信息

    1
    2
    3
    4
    5
    [root@tnode1 bin]# ./kafka-topics.sh  --zookeeper tnode3:2181 --describe --topic LxmTest
    Topic:LxmTest PartitionCount:3 ReplicationFactor:2 Configs:retention.ms=259200000,flush.messages=6
    Topic: LxmTest Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
    Topic: LxmTest Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
    Topic: LxmTest Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0

    发现已经没有 broker 2 节点的分区了

集群扩容

集群扩容很好理解,就是Kafka磁盘容量不足了,需要增加 broker 节点,然后迁移分区。和上面的 broker下线操作类似,只不是生成迁移计划的时候,增加 broker 而已。具体代码就不贴了。

总得来说,其实集群迁移,无论是 broker 下线 还是 集群扩容 或者是 增加副本,我们都只要手动梳理迁移计划,尽量将各个分区平衡分配,同时将 AR 列表第一个副本 (即默认的 leader 副本) 平均分配,然后进行迁移即可。

!## !大量 topic 进行迁移时,最好分批进行,避免影响正常业务。另外,可以限制迁移的流量。

1
[root@tnode1 bin]# ./kafka-reassign-partitions.sh --zookeeper tnode3:2181 --reassignment-json-file topic-repartition.json --execute --throttle 1024

!## !如果做了流量限制,需要使用 –verify 来检查是否完成迁移,在完成时,–verify 参数会解除限流

kafka设置某个topic的数据过期时间

全局设置

修改 server.properties

1
2
log.retention.hours=72
log.cleanup.policy=delete
单独对某一个topic设置过期时间

但如果只有某一个topic数据量过大。想单独对这个topic的过期时间设置短点:

1
./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000

retention.ms = 86400000 为一天,单位是毫秒。

查看设置:

1
2
3
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name mytopic --entity-type topics

Configs for mytopic are retention.ms=86400000
立即删除某个topic下的数据
1
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config cleanup.policy=delete