消息发送
一个完整的正常的生产者逻辑有以下几个步骤:
- 配置生产者实例:包括生产者参数配置和生产者实例创建
- 构建需要发送的消息 message发送消息
- 关闭生产者实例
先看一下消息发送的完整代码
| 1 | package kafka; | 
配置 kafkaProducer 实例
如代码注释所说,我们在配置参数时,手写参数可能会出错,因此最好使用 ProducerConfig 类里面定义好的参数,比手写快、方便、准确。同时,对于 key  value 的序列化类还可以通过 获取对应类的 类名来实现,不容易写错,毕竟这里是需要全路径的。配置完参数后就可以通过 KafkaProducer 来实例化一个 Producer 了。
| 1 | String brokerList = "kafka1:9092"; | 
其中 ,KafkaProducer<String, String>  的结构 <String, String> 指的是 key value 的类型。BOOTSTRAP_SERVERS_CONFIG :给定两个及以上就OK,producer 会从给定的broker找到其他broker。但是不建议给一个,防止宕机无法联系上kafka集群。CLIENT_ID_CONFIG :producer 的自有 ID,默认为空,不设置producer 会自动生成一个 producer 开头的字符串。
此外,KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将实例进行池化。
构建消息 message
首先,看一下 message 的结构,即 ProducerRecord 的结构 。包括五个值,topic、partition、key、value、timestamp。也就是说,我们构建  ProducerRecord 的时候,需要在构造方法中这几个值。
| 1 | public class ProducerRecord<K, V> { | 
构建 message 的构造方法有几个,如下代码所示。可以看出,必要的两个参数是 topic 和 value ,通常情况下指定这两个也够用。对于 key,可以根据分区或业务来指定不同的 key,以便 kafka 根据 key 来做 hash 将 message 均衡发送到不同的 partition,另外,有key的数据还可以支持日志压缩的功能。对于 partition,如果指定了,那么该条 message 只能发送到对应的 partition,而不会发送到其他 partition 。
| 1 | ProducerRecord(String topic, V value) | 
这边我们只指定 topic、key、value。当其他属性不设置时,默认为 null 。
| 1 | ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic", "1", "value"); | 
发送消息 message
发送消息分为两类,一类是同步发送,一类是异步发送。
- 发后即忘:Producer管发不管结果,发送成功与否不负责,存在数据丢失的风险,特别是ack = -1时。性能最高,可靠性最差。
- 同步发送:kafkaProducer.send(producerRecord)的返回值不是void类型,可以直接通过其返回值recordMetadata的get方法可以阻塞等待kafka响应,或者接受send方法的返回值recordMetadata后,再调用其get方法。如果发送失败,会进行重试 (假设producer的重试次数有设置的话) ,重试失败则直接抛异常。然后在异常里面处理 异常逻辑,recordMetadata中记录了topic、partition、offset等信息。
- 异步发送:使用自定义 Callback,kafka有响应时就会回调,要么成功,要么抛异常。异常 Exception 和RecordMetadata总有一个为null。对于同一个分区来说,Callback也是有序的。
| 1 | // 第一种 发后即忘 fire-and-forget。正常情况下不会有问题,但是发送失败,数据丢失。 | 
关闭 Producer
关闭有两个重载方法,无参的重载方法 close() 会等待所有的消息发送完之后进行 Producer 的关闭。有超时时间参数的方法 close()  超时后会强行退出。
| 1 | // 关闭 Producer,实际应用中,通常选择无参的 close。 |