消费者拦截器
原理和生产者的原理一样,主要用于在消费到消息时或者提交 offset 时进行定制化操作。先看看接口的代码
1 | public interface ConsumerInterceptor<K, V> extends Configurable { |
简单的实现,根据 ID 进行消息过滤,并去掉消息开头的 first。另外,打印每次提交的offset
信息
1 | package kafka.interceptor; |
使用该拦截器也很简单,只需要在 KafkaConsumer
中的配置中添加即可,如果是拦截链,方法和生产者的拦截链一样。
1 | prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName()); |