指定位移消费
在看指定 Offset 消费之前,先看一个参数 auto.offset.reset
,这个参数有三个值:earliest
,latest
,none
,默认值是 latest。通常情况下,我们在配置 consumer 时,我们都会指定 earliest 或 latest。因为当consumer消费时找不到正常的offset时,前者让 consumer 从头 (最早 offset) 开始消费,而后者让 consumer 从尾 (最新 offset) 开始消费。但是 none 在这种情况下就会直接抛异常了,即找不到 offset。
上面所述都是在消费整个 topic 的情况下,如果需要指定消费某个分区的话,那么就需要具体指定到 TopicPartition 的 offset了。指定的方式为 KafkaConsumer 中的 seek() 方法。
1 | // seek 只能重置消费者分配到的TopicPartition的offset,所以使用seek之前需要先使用一次poll或者使用一次assign方法 |
如代码所述,因为使用 seek()
方法的时候需要知道当前消费者所消费的分区,所以必须再 subscribe
订阅经过一次poll 或者 直接使用 assign
指定分区之后才能使用 seek。
seek() 的使用场景:在消费者重启时,如果能找到对应的 offset,此时如果 offset 无越界情况,那么 auto.offset.reset
参数就不会生效。但是又想让 消费者 从头、从尾或者指定位移消费时,就只能通过上面三个方法进行消费了。
另外,通过 seek 方案,我们还可以将 offset
自由保存在 ZK
,HBase
等之上,实现 exactly-once
的语义保证。
1 | // 从 Zookeeper 对应路径 /$zookeeperNameSpace/$baseZkPath/topic 下获取各分区offset |