指定位移消费

指定位移消费

在看指定 Offset 消费之前,先看一个参数 auto.offset.reset ,这个参数有三个值:earliestlatestnone,默认值是 latest。通常情况下,我们在配置 consumer 时,我们都会指定 earliest 或 latest。因为当consumer消费时找不到正常的offset时,前者让 consumer 从头 (最早 offset) 开始消费,而后者让 consumer 从尾 (最新 offset) 开始消费。但是 none 在这种情况下就会直接抛异常了,即找不到 offset。

上面所述都是在消费整个 topic 的情况下,如果需要指定消费某个分区的话,那么就需要具体指定到 TopicPartition 的 offset了。指定的方式为 KafkaConsumer 中的 seek() 方法。

1
2
3
4
5
6
// seek 只能重置消费者分配到的TopicPartition的offset,所以使用seek之前需要先使用一次poll或者使用一次assign方法
public void seek(TopicPartition partition, long offset)
//
public void seekToBeginning(Collection<TopicPartition> partitions)
//
public void seekToEnd(Collection<TopicPartition> partitions)

如代码所述,因为使用 seek() 方法的时候需要知道当前消费者所消费的分区,所以必须再 subscribe 订阅经过一次poll 或者 直接使用 assign 指定分区之后才能使用 seek。

seek() 的使用场景:在消费者重启时,如果能找到对应的 offset,此时如果 offset 无越界情况,那么 auto.offset.reset 参数就不会生效。但是又想让 消费者 从头、从尾或者指定位移消费时,就只能通过上面三个方法进行消费了。

另外,通过 seek 方案,我们还可以将 offset 自由保存在 ZKHBase等之上,实现 exactly-once 的语义保证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 从 Zookeeper 对应路径 /$zookeeperNameSpace/$baseZkPath/topic 下获取各分区offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = MyUtils.getOffsetFromZk(client, groupID, topicConsumer, baseZkPath);
// consumer 指定partition offset消费
if(!topicPartitionLongHashMap.isEmpty()){
// assign 指定 consumer 消费的分区
kafkaConsumer.assign(topicPartitionLongHashMap.keySet());
for (TopicPartition tp:topicPartitionLongHashMap.keySet()){
// 指定分区的具体 offset+1
kafkaConsumer.seek(tp,topicPartitionLongHashMap.get(tp)+1);
}
}
else {
kafkaConsumer.subscribe(Arrays.asList(topicConsumer));
}
// Map 存储 最新的offset
HashMap<String, Long> topicOffsetHashMap = new HashMap<>();

while (true) {
ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 获取消息数据,并转成json
String data =record.value();
JSONObject json = JSONObject.parseObject(data);
// 当前的topic和partition 装成 TopicPartition
String topicPartition = record.topic()+"_"+record.partition();
// 覆盖 topicOffsetHashMap 中对应的 offset 信息
topicOffsetHashMap.put(topicPartition, record.offset());

System.out.println(json);
}

// 保存 offSet 到 Zookeeper
for (String s:topicOffsetHashMap.keySet()){
Long offset = topicOffsetHashMap.get(s);
MyUtils.storeOffsetToZk(client, baseZkPath,groupID, s,offset);
}
}