消息发送
一个完整的正常的生产者逻辑有以下几个步骤:
- 配置生产者实例:包括生产者参数配置和生产者实例创建
- 构建需要发送的消息
message
发送消息 - 关闭生产者实例
先看一下消息发送的完整代码
1 | package kafka; |
配置 kafkaProducer 实例
如代码注释所说,我们在配置参数时,手写参数可能会出错,因此最好使用 ProducerConfig 类里面定义好的参数,比手写快、方便、准确。同时,对于 key
value
的序列化类还可以通过 获取对应类的 类名来实现,不容易写错,毕竟这里是需要全路径的。配置完参数后就可以通过 KafkaProducer 来实例化一个 Producer 了。
1 | String brokerList = "kafka1:9092"; |
其中 ,KafkaProducer<String, String>
的结构 <String, String>
指的是 key
value 的类型。BOOTSTRAP_SERVERS_CONFIG
:给定两个及以上就OK,producer
会从给定的broker找到其他broker。但是不建议给一个,防止宕机无法联系上kafka集群。CLIENT_ID_CONFIG
:producer
的自有 ID
,默认为空,不设置producer
会自动生成一个 producer
开头的字符串。
此外,KafkaProducer
是线程安全的,可以在多个线程中共享单个 KafkaProducer
实例,也可以将实例进行池化。
构建消息 message
首先,看一下 message
的结构,即 ProducerRecord
的结构 。包括五个值,topic
、partition
、key
、value
、timestamp
。也就是说,我们构建 ProducerRecord
的时候,需要在构造方法中这几个值。
1 | public class ProducerRecord<K, V> { |
构建 message
的构造方法有几个,如下代码所示。可以看出,必要的两个参数是 topic
和 value
,通常情况下指定这两个也够用。对于 key,可以根据分区或业务来指定不同的 key,以便 kafka
根据 key
来做 hash
将 message
均衡发送到不同的 partition
,另外,有key的数据还可以支持日志压缩的功能。对于 partition
,如果指定了,那么该条 message
只能发送到对应的 partition
,而不会发送到其他 partition
。
1 | ProducerRecord(String topic, V value) |
这边我们只指定 topic
、key
、value
。当其他属性不设置时,默认为 null
。
1 | ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic", "1", "value"); |
发送消息 message
发送消息分为两类,一类是同步发送,一类是异步发送。
- 发后即忘:
Producer
管发不管结果,发送成功与否不负责,存在数据丢失的风险,特别是ack = -1
时。性能最高,可靠性最差。 - 同步发送:
kafkaProducer.send(producerRecord)
的返回值不是void
类型,可以直接通过其返回值recordMetadata
的get
方法可以阻塞等待kafka
响应,或者接受send
方法的返回值recordMetadata
后,再调用其get
方法。如果发送失败,会进行重试 (假设producer
的重试次数有设置的话) ,重试失败则直接抛异常。然后在异常里面处理 异常逻辑,recordMetadata
中记录了topic
、partition
、offset
等信息。 - 异步发送:使用自定义
Callback
,kafka
有响应时就会回调,要么成功,要么抛异常。异常 Exception 和RecordMetadata
总有一个为null
。对于同一个分区来说,Callback
也是有序的。
1 | // 第一种 发后即忘 fire-and-forget。正常情况下不会有问题,但是发送失败,数据丢失。 |
关闭 Producer
关闭有两个重载方法,无参的重载方法 close()
会等待所有的消息发送完之后进行 Producer
的关闭。有超时时间参数的方法 close()
超时后会强行退出。
1 | // 关闭 Producer,实际应用中,通常选择无参的 close。 |