拦截器 Interceptor
目前,在 kafka
中有两种拦截器,一个是生产者拦截器,也就是马上要看的。另一个是对应的消费者拦截器,后面再研究。生产者拦截器作用发生在消息序列化之前,可以提前做一些准备工作,比如按照规则过滤或修改消息值(不建议修改其他部分,如 key
,partition
等),也可以做类似消息统计这样的工作。
生产者拦截器主
要是实现 org.apache.kafka.clients.producer.ProducerInterceptor
接口。
ProducerInterceptor 接口
1 | public interface ProducerInterceptor<K, V> extends Configurable { |
KafkaProducer
在将消息序列化和计算分区之前,会先调用生产者拦截器的 onSend()
方法。另外,KafkaProducer
会在消息发送成功或者失败时调用 onAcknowledgement()
方法,可以做一些简单的统计类操作,因为这个方法是在 I/O 线程中,所以逻辑越简单越好,不然会影响消息发送速度。
另外接口也继承了 Configurable
接口,因此如果要实现该接口,还需实现 Configurable
接口中的 configure方法。
自定义拦截器 MyProducerInterceptor
1 | package kafka.interceptor; |
拦截器的 close()
方法会在 KafkaProducer
关闭时调用。
自定义拦截器的使用
在KafkaProducer
的配置属性中,加上拦截器的配置参数即可。如果有多个拦截器,注意拦截器的顺序。
1 | properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName()); |