拦截器 Interceptor
目前,在 kafka 中有两种拦截器,一个是生产者拦截器,也就是马上要看的。另一个是对应的消费者拦截器,后面再研究。生产者拦截器作用发生在消息序列化之前,可以提前做一些准备工作,比如按照规则过滤或修改消息值(不建议修改其他部分,如 key,partition等),也可以做类似消息统计这样的工作。
生产者拦截器主
要是实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。
ProducerInterceptor 接口
| 1 | public interface ProducerInterceptor<K, V> extends Configurable { | 
KafkaProducer在将消息序列化和计算分区之前,会先调用生产者拦截器的 onSend() 方法。另外,KafkaProducer 会在消息发送成功或者失败时调用 onAcknowledgement() 方法,可以做一些简单的统计类操作,因为这个方法是在 I/O 线程中,所以逻辑越简单越好,不然会影响消息发送速度。
另外接口也继承了 Configurable 接口,因此如果要实现该接口,还需实现 Configurable 接口中的 configure方法。
自定义拦截器 MyProducerInterceptor
| 1 | package kafka.interceptor; | 
拦截器的 close() 方法会在 KafkaProducer 关闭时调用。
自定义拦截器的使用
在KafkaProducer的配置属性中,加上拦截器的配置参数即可。如果有多个拦截器,注意拦截器的顺序。
| 1 | properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName()); |