反序列化

反序列化

和生产者端的序列化器类似,消费者端对应的是反序列化器。Kafka 中反序列化器实现的都是 Deserializer 接口,默认实现有:

  • ByteBufferDeserializer
  • BytesDeserializer
  • DoubleDeserializer
  • FloatDeserializer
  • IntegerDeserializer
  • LongDeserializer
  • ShortDeserializer
  • StringDeserializer

反序列化 Deserializer 接口也和 Serializer 接口一样,需要实现三个方法

1
2
3
4
5
6
7
8
public interface Deserializer<T> extends Closeable {
// 用来配置当前类
public void configure(Map<String, ?> configs, boolean isKey);
// 用来反序列化,如果 data 为 null,需要返回 null,而不是抛异常
public T deserialize(String topic, byte[] data);
// 关闭序列化器,一般是空方法
public void close();
}

常用的 StringDeserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class StringDeserializer implements Deserializer<String> {
// 默认字符编码
private String encoding = "UTF8";

// 获取配置中设置的字符编码,如果设置了的话
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("deserializer.encoding");
if (encodingValue != null && encodingValue instanceof String)
encoding = (String) encodingValue;
}

// 如果数据是 null,返回 null。否则进行反序列化
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}

@Override
public void close() {
// nothing to do
}
}

自定义反序列化器 PersonDeserializer :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package kafka.serializer;

import kafka.utils.Person;
import org.apache.commons.lang.SerializationException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class PersonDeserializer implements Deserializer {

// 默认字符编码
private String encoding = "UTF-8";

@Override
public void configure(Map configs, boolean isKey) {
String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null){
encodingValue = configs.get("deserializer.encoding");
}
if (encodingValue != null && encodingValue instanceof String){
encoding = (String) encodingValue;
}
}

@Override
public Object deserialize(String topic, byte[] data) {

// 数据为空直接返回
if (data == null)
return null;
// 定义返回类型
Person person = null;
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
// 反序列化对象,此对象需要实现 java.io.Serializer 接口
person = (Person)objectInputStream.readObject();

} catch (Exception e){
System.out.println(e);
}
return person;
}

@Override
public void close() {
// 空方法
}
}

使用反序列化器只需要在 consumer 的配置中加上即可。需要注意的是,consumer 端的反序列化器需要和 producer 端的序列化器解耦,不然无法正常消费。同时,如果有拦截器的话,那么需要注意拦截器中的操作,不能使得序列化器无法序列。

1
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, PersonDeserializer.class.getName());