Topic的管理

Topic的管理

Topic的管理包括创建、查看、修改和删除 topic 等操作。四个操作均可以用 Kafka 提供的脚本 kafka-topic.sh 来执行。

1
2
3
4
5
[root@tnode1 bin]# cat kafka-topics.sh 
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

可以看到实际上 脚本是通过调用这个类来实现的,我们也可以直接通过 KafkaAdminClient 的方式来实现这些操作。脚本的操作方式可以看 本笔记 Kafka主目录下的
[Kafka客户端命令操作]: F:\myGit\DT-Learner\Kafka\Kafka客户端命令操作.md “文档1”

然后看 KafkaAdminClient 的实现

初始化 KafkaAdminClient 配置

1
2
3
4
5
6
7
8
private static Properties initKafkaClient(){

String brokerList = "tnode3:9092";
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return properties;
}

创建 Topic

单个topic,最简配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// init client
AdminClient adminClient = AdminClient.create(initKafkaClient());

String topic0011 = "topic001";
// new topic
NewTopic topic001 = new NewTopic(topic0011, 3, (short) 2);
// create topic
CreateTopicsResult topicsResult = adminClient.createTopics(Collections.singleton(topic001));
// 获取创建结果是否有异常
try {
topicsResult.all().get();
System.out.println("Topic:" + topic0011 + "创建成功");
} catch (InterruptedException | ExecutionException e){
e.printStackTrace();
}
adminClient.close();

多个 topic ,复杂配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
AdminClient adminClient = AdminClient.create(initKafkaClient());

String topic001 = "topic001";
String topic002 = "topic002";

// 删除 Topic
adminClient.deleteTopics(Arrays.asList(topic001,topic002));

// 第一个 Topic 配置
NewTopic newTopic001 = new NewTopic(topic001, 3, (short) 2);
// 第二个 Topic 配置,指定明细的副本和分区策略
HashMap<Integer, List<Integer>> replicasAssignments002 = new HashMap<>();
replicasAssignments002.put(0, Arrays.asList(0,1));
replicasAssignments002.put(1, Arrays.asList(1,0));
NewTopic newTopic002 = new NewTopic(topic002, replicasAssignments002);

try {
CreateTopicsResult topicsResult = adminClient.createTopics(Arrays.asList(newTopic001, newTopic002));
topicsResult.all().get();
System.out.println("Topic:" + topic001 + "创建成功");
} catch (InterruptedException | ExecutionException e){
System.out.println(e.getMessage());
}
adminClient.close();