生产者序列化

序列化 Serializer

Kafka 中,生产者 Producer 发送消息给 Broker 之前,需要先将消息转换成字节数组才能进行发送操作。同样,消费者 Consumer 消费消息时,也需要将来自 Broker 的字节数组转换成具体消息。这就需要序列化和反序列化了。目前来说,Kafka 中用到的序列化接口只有一个:org.apache.kafka.common.serialization.SerializerSerializer 的实现类有以下几种:

  • StringSerializer

  • ByteArraySerializer

  • ByteBufferSerializer

  • BytesSerializer

  • DoubleSerializer

  • IntegerSerializer

  • LongSerializer

先看 Serializer 接口

Serializer 接口

该接口只有三个方法。configure() 方法用来配置当前类,serializer() 方法用来执行序列化操作,close() 方法则用来关闭当前序列化器。在源码注释有提到,如果实现了 close() 方法,则必须确保该方法的幂等性,因为此方法可能会被 KafkaProducer 调用多次。

1
2
3
4
5
6
7
public interface Serializer<T> extends Closeable {

public void configure(Map<String, ?> configs, boolean isKey);
public byte[] serialize(String topic, T data);
@Override
public void close();
}

接着看一下常用的 StringSerializer 类。

实现类:StringSerializer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class StringSerializer implements Serializer<String> {
// 默认的编码 UTF-8
private String encoding = "UTF8";


// configure方法,用来确定序列化的编码格式。
// 但是由于 key.serializer.encoding 和 value.serializer.encoding 以及 serializer.encoding 在 KafkaProducer 基本不会配置。所以基本编码都是该实现类的默认编码。
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}

// serializer 方法。将String类型数据转为 byte[] 类型
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding); // 将String类型数据根据编码类型转换成byte[]
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}

@Override
public void close() {
// nothing to do 基本close方法都不需要做什么,如果要实现,必须实现幂等性。
}
}

自定义实现 Serializer 类:PersonSerializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package kafka.serializer;

import kafka.utils.Person;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Map;

/**
* created by lxm on 2019-08-15
*
*/

// 指定泛型为 Person
public class PersonSerializer implements Serializer<Person> {

private String encoding = "UTF-8";

@Override
public void configure(Map configs, boolean isKey) {
// 不做什么操作,可以仿照 StringSerializer 来实现
}

@Override
public byte[] serialize(String topic, Person person) {

// 如果数据为空直接返回空
if (person == null){
return null;
}
// 外部定义好需要返回的结果类型
byte[] byteArray = null;
try {

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
// 序列化对象 Person (被序列化的对象列需要实现 java.io.Serializer 接口)
objectOutputStream.writeObject(person);
byteArray = byteArrayOutputStream.toByteArray();

} catch (IOException e){
e.printStackTrace();
}
return byteArray;
}

@Override
public void close() {
// 照常什么都不做。
}
}
是用自定义的序列化器

使用自定义的序列化器也很简单,只需要将 生产者 Producervalue 序列化参数修改为 自定义的序列化类。另外,初始化生产者 Producer 和 ~时,注意泛型的变动。核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 实体类
public class Person {

private int id;
private String name;
private String address;

public Person(int id, String name, String address){
this.id = id;
this.name = name;
this.address = address;
}
...
}

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PersonSerializer.class.getName());

KafkaProducer<String, Person> kafkaProducer = new KafkaProducer<>(properties);

ProducerRecord<String, Person> producerRecord = new ProducerRecord<String,

Person>(topic, person);

Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
RecordMetadata recordMetadata = future.get();