消息发送

消息发送

一个完整的正常的生产者逻辑有以下几个步骤:

  1. 配置生产者实例:包括生产者参数配置和生产者实例创建
  2. 构建需要发送的消息 message 发送消息
  3. 关闭生产者实例

先看一下消息发送的完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class MyProducer {

public static void main(String[] args){

String brokerList = "kafka1:9092";
Properties properties = new Properties();

// 增加 Producer 参数时,最好不要手写参数,使用类里面定义好的最好,不用担心写错。
// 另外,序列化的参数值也可以通过获取 class name 的方式。
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // broker list
properties.put(ProducerConfig.RETRIES_CONFIG, 10); // 重试次数
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.myId");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 构建消息 有几个构造方法,按需选择。topic, value 是必选项。最好加上 key,通过 key 来 hash 分 partition
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic", "1", "value");

// 第一种 发后即忘 fire-and-forget。正常情况下不会有问题,但是发送失败,数据丢失。
kafkaProducer.send(producerRecord);


// 第二种 同步发送,如果没有 get 方法,则为 发过即忘 ,即发送失败会重试,重试失败丢数据。
// 有了 get 之后,发送失败会重试,重试不行抛异常,可以在异常里面处理下一步
try {
kafkaProducer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

// 第三种 异步发送 自定义 Callback,在 Callback 里面处理消息发送失败的逻辑。
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null){
System.out.println("中止程序");
}
else {
System.out.println(
recordMetadata.topic() + ":" +
recordMetadata.partition() + ":" +
recordMetadata.offset());
}
}
});

// 关闭 Producer,实际应用中,通常选择无参的 close。
kafkaProducer.close();
kafkaProducer.close(10000, TimeUnit.MILLISECONDS) ;

}
}
配置 kafkaProducer 实例

如代码注释所说,我们在配置参数时,手写参数可能会出错,因此最好使用 ProducerConfig 类里面定义好的参数,比手写快、方便、准确。同时,对于 key value 的序列化类还可以通过 获取对应类的 类名来实现,不容易写错,毕竟这里是需要全路径的。配置完参数后就可以通过 KafkaProducer 来实例化一个 Producer 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
String brokerList = "kafka1:9092";

Properties properties = new Properties();

// 增加 Producer 参数时,最好不要手写参数,使用类里面定义好的最好,不用担心写错。
// 另外,序列化的参数值也可以通过获取 class name 的方式。
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); // broker list
properties.put(ProducerConfig.RETRIES_CONFIG, 10); // 重试次数
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.myId");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

其中 ,KafkaProducer<String, String> 的结构 <String, String> 指的是 key value 的类型。BOOTSTRAP_SERVERS_CONFIG :给定两个及以上就OK,producer 会从给定的broker找到其他broker。但是不建议给一个,防止宕机无法联系上kafka集群。CLIENT_ID_CONFIGproducer 的自有 ID,默认为空,不设置producer 会自动生成一个 producer 开头的字符串。

此外,KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将实例进行池化。

构建消息 message

首先,看一下 message 的结构,即 ProducerRecord 的结构 。包括五个值,topicpartitionkeyvaluetimestamp。也就是说,我们构建 ProducerRecord 的时候,需要在构造方法中这几个值。

1
2
3
4
5
6
7
8
9
10
public class ProducerRecord<K, V> {

private final String topic; // 记录topic
private final Integer partition; // 记录partition
private final K key; // 记录key
private final V value; // 记录数据值
private final Long timestamp; // 记录时间戳

...
}

构建 message 的构造方法有几个,如下代码所示。可以看出,必要的两个参数是 topicvalue ,通常情况下指定这两个也够用。对于 key,可以根据分区或业务来指定不同的 key,以便 kafka 根据 key 来做 hashmessage 均衡发送到不同的 partition,另外,有key的数据还可以支持日志压缩的功能。对于 partition,如果指定了,那么该条 message 只能发送到对应的 partition,而不会发送到其他 partition

1
2
3
4
ProducerRecord(String topic, V value)
ProducerRecord(String topic, K key, V value)
ProducerRecord(String topic, Integer partition, K key, V value)
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)

这边我们只指定 topickeyvalue。当其他属性不设置时,默认为 null

1
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic", "1", "value");
发送消息 message

发送消息分为两类,一类是同步发送,一类是异步发送。

  • 发后即忘:Producer 管发不管结果,发送成功与否不负责,存在数据丢失的风险,特别是 ack = -1 时。性能最高,可靠性最差。
  • 同步发送:kafkaProducer.send(producerRecord) 的返回值不是 void 类型,可以直接通过其返回值 recordMetadataget 方法可以阻塞等待 kafka 响应,或者接受 send 方法的返回值recordMetadata 后,再调用其 get方法。如果发送失败,会进行重试 (假设 producer 的重试次数有设置的话) ,重试失败则直接抛异常。然后在异常里面处理 异常逻辑,recordMetadata 中记录了topicpartitionoffset等信息。
  • 异步发送:使用自定义 Callbackkafka有响应时就会回调,要么成功,要么抛异常。异常 Exception 和 RecordMetadata 总有一个为 null。对于同一个分区来说,Callback也是有序的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 第一种 发后即忘 fire-and-forget。正常情况下不会有问题,但是发送失败,数据丢失。
kafkaProducer.send(producerRecord);

// 第二种 同步发送,如果没有 get 方法,则为 发过即忘 ,即发送失败会重试,重试失败丢数据。
// 有了 get 之后,发送失败会重试,重试不行抛异常,可以在异常里面处理下一步
try {
kafkaProducer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

// 第三种 异步发送 自定义 Callback,在 Callback 里面处理消息发送失败的逻辑。
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null){
System.out.println("中止程序");
}
else {
System.out.println(
recordMetadata.topic() + ":" +
recordMetadata.partition() + ":" +
recordMetadata.offset());
}
}
});
关闭 Producer

关闭有两个重载方法,无参的重载方法 close() 会等待所有的消息发送完之后进行 Producer 的关闭。有超时时间参数的方法 close() 超时后会强行退出。

1
2
3
// 关闭 Producer,实际应用中,通常选择无参的 close。
kafkaProducer.close();
kafkaProducer.close(10000, TimeUnit.MILLISECONDS);