序列化 Serializer
在 Kafka
中,生产者 Producer
发送消息给 Broker
之前,需要先将消息转换成字节数组才能进行发送操作。同样,消费者 Consumer
消费消息时,也需要将来自 Broker
的字节数组转换成具体消息。这就需要序列化和反序列化了。目前来说,Kafka
中用到的序列化接口只有一个:org.apache.kafka.common.serialization.Serializer
。Serializer
的实现类有以下几种:
StringSerializer
ByteArraySerializer
ByteBufferSerializer
BytesSerializer
DoubleSerializer
IntegerSerializer
LongSerializer
先看 Serializer
接口
Serializer 接口
该接口只有三个方法。configure()
方法用来配置当前类,serializer()
方法用来执行序列化操作,close()
方法则用来关闭当前序列化器。在源码注释有提到,如果实现了 close()
方法,则必须确保该方法的幂等性,因为此方法可能会被 KafkaProducer
调用多次。
1 | public interface Serializer<T> extends Closeable { |
接着看一下常用的 StringSerializer
类。
实现类:StringSerializer
1 | public class StringSerializer implements Serializer<String> { |
自定义实现 Serializer
类:PersonSerializer
1 | package kafka.serializer; |
是用自定义的序列化器
使用自定义的序列化器也很简单,只需要将 生产者 Producer
的 value
序列化参数修改为 自定义的序列化类。另外,初始化生产者 Producer
和 ~时,注意泛型的变动。核心代码如下:
1 | // 实体类 |