序列化 Serializer
在 Kafka 中,生产者 Producer 发送消息给 Broker 之前,需要先将消息转换成字节数组才能进行发送操作。同样,消费者 Consumer 消费消息时,也需要将来自 Broker 的字节数组转换成具体消息。这就需要序列化和反序列化了。目前来说,Kafka 中用到的序列化接口只有一个:org.apache.kafka.common.serialization.Serializer。Serializer 的实现类有以下几种:
StringSerializerByteArraySerializerByteBufferSerializerBytesSerializerDoubleSerializerIntegerSerializerLongSerializer
先看 Serializer 接口
Serializer 接口
该接口只有三个方法。configure() 方法用来配置当前类,serializer() 方法用来执行序列化操作,close() 方法则用来关闭当前序列化器。在源码注释有提到,如果实现了 close() 方法,则必须确保该方法的幂等性,因为此方法可能会被 KafkaProducer 调用多次。
1 | public interface Serializer<T> extends Closeable { |
接着看一下常用的 StringSerializer 类。
实现类:StringSerializer
1 | public class StringSerializer implements Serializer<String> { |
自定义实现 Serializer 类:PersonSerializer
1 | package kafka.serializer; |
是用自定义的序列化器
使用自定义的序列化器也很简单,只需要将 生产者 Producer 的 value 序列化参数修改为 自定义的序列化类。另外,初始化生产者 Producer 和 ~时,注意泛型的变动。核心代码如下:
1 | // 实体类 |