消费
消费者和消费组
对于每一个消费者 Consumer
而言,都有其对应的消费组 Consumer Group
。每条消息都只会发送到每个消费组中的一个消费者。消费者数量不应该大于 Topic
分区数量,否则会有消费者消费不到消息的情况。
客户端开发
和生产者类似,一个完整的消费逻辑如下:
- 配置消费者
Consumer
参数以及创建相应的消费者实例 - 消费者订阅
Topic
- 消费者拉取消息进行消费
- 消费者提交
offset
- 关闭消费者实例
先来看一个简单的 Consumer demo
1 | package kafka.consumer; |
配置消费者 Consumer 参数以及创建相应的消费者实例
参考上面代码,类比 生产者 Producer
的配置,这边配置也类似,只不过多了一个 group id
即 Consumer Group
的配置,其他配置都类似,不再赘述。
额外提一点,KafkaProducer
和 KafkaConsumer
都有两个构造方法,第一个都是直接加载 参数的,即我们代码中使用的;第二个是 加载参数和 序列化反序列化器的。因为我们一般都在参数中设置序列化和反序列化器,所以常用的构造方法都是第一种,第二种不怎么用。
1 | public KafkaConsumer(Map<String, Object> configs) |
消费者订阅 Topic
kafkaConsumer
订阅消息有两种类型:
subscribe
直接订阅 Topic,可直接指定Topic
集合,也可使用正则表达式。如果不指定ReBalanceListener
,则使用默认的ReBalanceListener
。1
2public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)对于使用集合方式订阅
topic
的,”后发制人”,即最后订阅有效,前面的不生效。如1
2kafkaConsumer.subscribe(Arrays.asList("topic1"));
kafkaConsumer.subscribe(Arrays.asList("topic2"));最终消费订阅的是
topic2
对于使用 正则表达式的 方式订阅
topic
的,符合正则表达式的所有topic
都会被消费,无论此 topic是在消费程序启动之前创建还是消费程序启动之后创建的。前期的版本,使用正则时,必须显示指定ReBalanceListener
。1
kafkaConsumer.subscribe(Pattern.compile("topic-.*"));
assign
订阅指定分区,参数为TopicPartition
的集合。另外,这种消费方式不能自动 rebanlance。1
public void assign(Collection<TopicPartition> partitions)
先看以下
TopicPartition
类。主要属性就两个,一个topic
,一个partition
。作用就是映射topic-partition
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
...
}例子如下:
kafkaConsumer.assign(Arrays.asList(new TopicPartition("topic",0)));
kafkaConsumer
取消订阅
几种取消的方式,效果一样。
1
2
3kafkaConsumer.unsubscribe();
kafkaConsumer.subscribe(Arrays.asList());
kafkaConsumer.assign(new ArrayList<TopicPartition>());如果取消了订阅,那么程序会抛异常 :
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
此外,三种订阅方式(基于集合、基于正则和指定分区的)都会使消费者具有不同的状态,且互斥。因此同一个消费者只能使用其中的一种。
消费者拉取消息进行消费
一般消息队列的消费有两种模式:推模式和拉模式。见名知意,推模式就是服务端主动将消息推送给消费者,拉模式就是消费者主动向服务端发起拉取消息的请求。而 Kafka
就是基于拉模式的,通过上面的 demo
代码可见,kafka
中消费者消费是一个持续循环的 poll
的过程,poll()
方法返回的就是所订阅的主题 (分区) 的上一组消息,具体为 ConsumerRecords
。然后轮询 ConsumerRecords
就可以对每一条消息 ConsumerRecord
进行处理。
1 | while (true){ |
ConsumerRecord
的属性有很多
1 | public class ConsumerRecord<K, V> { |
消费者提交 offset
在上面消费消息时,每次 poll 的数据都是未被消费过的数据,要做到标记未被消费过,就需要每次记录上一次消费后的消费的 offset
,并且这个记录需要持久化保存,因为存在缓存或者内存可能会丢失。另外,在 Consumer Group
成员变动时,对于同一分区,可能在发生 rebanlance
之后,被不同消费者消费了,也需要持久化记录消费的 offset
。
注意!!提交的 offset 是下次拉取的 offset ,而不是现在消费完的 offset 。
当然,在 Kafka 中也有不同的 offset 提交方式,有默认的自动提交,也有显示的手动提交。手动提交又分同步和异步两种:
自动提交
offset
:Kafka
周期性的自动提交 offset。enable.auto.commit = true
同步提交
offset
:enable.auto.commit = false
每次提交 offset 时,会阻塞线程直到 offset 提交成功才能进行下一次poll
消息。无参的commitSync()
是每次poll
的批量提交,带参数 (TP 和offset
的 Map) 的是针对每条或每个分区的消息的offset
进行提交,如果是每条消息进行提交,那么性能将最低;如果是每个分区提交,那么可以类似这样子做:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25try {
// 持续不断消费数据
while (true){
// 每隔 1 秒拉取一批数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
// 遍历分区
for (TopicPartition tp:records.partitions()){
// 获取每个分区的消息
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
// 遍历每个分区的消息集合
for (ConsumerRecord<String, String> record:tpRecords){
// 业务逻辑处理
LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
}
// 同步提交每个分区的 offset
// tpRecords.get(tpRecords.size()-1).offset() 最后一条消息的offset
kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata( tpRecords.get(tpRecords.size()-1).offset() + 1)));
}
}
} catch (Exception e) {
LOG.error("", e);
} finally {
// 关闭消费者
kafkaConsumer.close();
}
异步提交
offset
:enable.auto.commit = false
和同步的相反,异步提交offset
时,不会阻塞消费,可能还没提交成功就已经拉取完下一批数据了。异步提交可以一定程度的提高性能。异步提交有三个重载方法:1
2
3public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)第一个和第三个中的
offset
和同步提交的一样,不一样的就是第二个和第三个的OffsetCommitCallback
。异步和同步最大的不同就是,有可能前一批offset
提交失败,但是后一批提交成功,这样子容易造成重复消费。如下代码,OffsetCommitCallback
类,在提交完成之后会回调其中的onComplete()
方法,我们可以在这个方法里面进行判断成功或者失败,然后进行相应的处理。假设提交失败的话,如果需要进行重试,那么要注意判断此次提交的 offset 和已经提交成功的 offset 的大小,然后才能知道是否需要提交offset
(如果后一批提交成功,那么已经提交成功的 offset 就肯定会比前一批提交失败的offset
大)。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24while (true){
// 每隔 1 秒拉取一批数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
// 遍历拉取的消息集
for (TopicPartition tp:records.partitions()){
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
for (ConsumerRecord<String, String> record:tpRecords){
LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
}
// 异步提交每个分区的 offset
// tpRecords.get(tpRecords.size()-1).offset() 最后一条消息的offset
kafkaConsumer.commitAsync( Collections.singletonMap(tp, new OffsetAndMetadata(tpRecords.get(tpRecords.size() - 1).offset() + 1)),
new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
LOG.info("offset 提交成功:" + offsets);
} else {
LOG.error("kafka commit offset failure!!");
}
}
});
}
}
关闭消费者实例
正常情况下,消费者是一直都在运行的,但是如果我们需要暂定或者关闭消费者,就需要考虑怎么退出消费的循环了。如果是暂定,则需要考虑如何恢复。
先看简单的暂定和恢复,KafkaConsumer
提供了 pause()
和 resume()
来暂停和恢复 kafkaConsumer
对某些分区的消费。还有一个无参的 paused()
的方法来返回已经暂定的方法。
1 | public void pause(Collection<TopicPartition> partitions) // 暂定某些分区的消费 |
然后,如何优雅的退出 poll
的那层循环呢。一般来说有两种方式:
- 开始
poll
的那个循环不用试固定的true
,而是做成一个能开关的全局变量,这样子就能在其他地方也可以直接中断循环 - 直接调用
Kafkaconsumer
的wakeup()
方法。该方法会退出poll
的循环并抛出一个wakeup
的异常
在退出循环之后,需要显示的关闭 KafkaConsumer
,以释放资源。关闭实例有三个重载方法:
1 | kafkaConsumer.close(); // 默认 30 秒,也是调用的后面的方法 |