消费者拦截器

消费者拦截器

原理和生产者的原理一样,主要用于在消费到消息时或者提交 offset 时进行定制化操作。先看看接口的代码

1
2
3
4
5
6
7
8
9
10
public interface ConsumerInterceptor<K, V> extends Configurable {
// poll 返回数据之前会回调此方法。
// 这里对消息进行定制化,过滤等操作
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
// 提交完 offset 之后会回调这个方法
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
// 释放资源
public void close();

}

简单的实现,根据 ID 进行消息过滤,并去掉消息开头的 first。另外,打印每次提交的offset信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package kafka.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {

@Override
public void configure(Map<String, ?> configs) {
// 不做任何配置
}

@Override
public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> records) {
// 初始化一个 List,来收集需要返回的 ConsumerRecord
HashMap<TopicPartition, List<ConsumerRecord<String,String>>> recordsHashMap = new HashMap<>();
// 遍历每一个分区
for (TopicPartition tp:records.partitions()){
// List 收集每个分区内需要消费的数据
ArrayList<ConsumerRecord<String, String>> tpRecords = new ArrayList<>();
// 遍历分区内的消息集合
for (ConsumerRecord<String,String> record:records.records(tp)){
// ID 能整除 2 的才消费
if (Integer.valueOf(record.value().split(":")[1])%2 == 0){
// value 部分,去掉开头的 first:
tpRecords.add(new ConsumerRecord<String,String>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.checksum(),
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
record.value().substring(6)));
}
}
// 将每个分区的消息集合添加到 Map
recordsHashMap.put(tp, tpRecords);
}
// 将 Map 转成 ConsumerRecords 并返回
return new ConsumerRecords<>(recordsHashMap);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {

offsets.forEach((tp,offset) -> System.out.println(tp+":"+offset.offset()));
}

@Override
public void close() {
// 空方法
}
}

使用该拦截器也很简单,只需要在 KafkaConsumer 中的配置中添加即可,如果是拦截链,方法和生产者的拦截链一样。

1
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());