生产者拦截器

拦截器 Interceptor

目前,在 kafka 中有两种拦截器,一个是生产者拦截器,也就是马上要看的。另一个是对应的消费者拦截器,后面再研究。生产者拦截器作用发生在消息序列化之前,可以提前做一些准备工作,比如按照规则过滤或修改消息值(不建议修改其他部分,如 keypartition等),也可以做类似消息统计这样的工作。

生产者拦截器主

要是实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。

ProducerInterceptor 接口
1
2
3
4
5
6
7
8
public interface ProducerInterceptor<K, V> extends Configurable {
// 在这个方法中对消息进行相应的定制修改操作。
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
// 这个方法会在会在消息被应答前或者发送失败时调用
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
// 关闭拦截器时做资源清理工作
public void close();
}

KafkaProducer在将消息序列化和计算分区之前,会先调用生产者拦截器的 onSend() 方法。另外,KafkaProducer 会在消息发送成功或者失败时调用 onAcknowledgement() 方法,可以做一些简单的统计类操作,因为这个方法是在 I/O 线程中,所以逻辑越简单越好,不然会影响消息发送速度。

另外接口也继承了 Configurable 接口,因此如果要实现该接口,还需实现 Configurable 接口中的 configure方法。

自定义拦截器 MyProducerInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class MyProducerInterceptor implements ProducerInterceptor {

private static final Logger LOG = LoggerFactory.getLogger(MyProducerInterceptor.class);

// 定义两个全局变量,来统计成功和失败消息数
private long successSend = 0;
private long failureSend = 0;

@Override
public void configure(Map<String, ?> configs) {
// 空方法
}

// 在这里对消息进行规则定制化
// 暂时只对消息的值即value加上 first 前缀。
@Override
public ProducerRecord onSend(ProducerRecord record) {
return new ProducerRecord<>(record.topic(),
record.partition(),
record.timestamp(),
record.key(),
"first-"+record.value());
}

// 在这里进行消息发送状态进行统计,成功失败都累加到对应的累加器上
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null)
successSend ++;
else
failureSend ++;
}

@Override
public void close() {
LOG.info("成功发送:"+successSend +"条,发送失败:"+ failureSend + "条,成功率:"+
String.format("%f",(double)(successSend/(successSend+failureSend))*100)+"%");
}
}

拦截器的 close() 方法会在 KafkaProducer 关闭时调用。

自定义拦截器的使用

KafkaProducer的配置属性中,加上拦截器的配置参数即可。如果有多个拦截器,注意拦截器的顺序。

1
2
3
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName() + "," + MyProducerInterceptor1.class.getName());