消费
消费者和消费组
对于每一个消费者 Consumer 而言,都有其对应的消费组 Consumer Group。每条消息都只会发送到每个消费组中的一个消费者。消费者数量不应该大于 Topic 分区数量,否则会有消费者消费不到消息的情况。
客户端开发
和生产者类似,一个完整的消费逻辑如下:
- 配置消费者 Consumer参数以及创建相应的消费者实例
- 消费者订阅 Topic
- 消费者拉取消息进行消费
- 消费者提交 offset
- 关闭消费者实例
先来看一个简单的 Consumer demo
| 1 | package kafka.consumer; | 
配置消费者 Consumer 参数以及创建相应的消费者实例
参考上面代码,类比 生产者 Producer 的配置,这边配置也类似,只不过多了一个 group id 即 Consumer Group 的配置,其他配置都类似,不再赘述。
额外提一点,KafkaProducer 和 KafkaConsumer 都有两个构造方法,第一个都是直接加载 参数的,即我们代码中使用的;第二个是 加载参数和 序列化反序列化器的。因为我们一般都在参数中设置序列化和反序列化器,所以常用的构造方法都是第一种,第二种不怎么用。
| 1 | public KafkaConsumer(Map<String, Object> configs) | 
消费者订阅 Topic
kafkaConsumer 订阅消息有两种类型:
- subscribe直接订阅 Topic,可直接指定- Topic集合,也可使用正则表达式。如果不指定- ReBalanceListener,则使用默认的- ReBalanceListener。- 1 
 2- public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
 public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)- 对于使用集合方式订阅 - topic的,”后发制人”,即最后订阅有效,前面的不生效。如- 1 
 2- kafkaConsumer.subscribe(Arrays.asList("topic1")); 
 kafkaConsumer.subscribe(Arrays.asList("topic2"));- 最终消费订阅的是 - topic2
- 对于使用 正则表达式的 方式订阅 - topic的,符合正则表达式的所有- topic都会被消费,无论此 topic是在消费程序启动之前创建还是消费程序启动之后创建的。前期的版本,使用正则时,必须显示指定- ReBalanceListener。- 1 - kafkaConsumer.subscribe(Pattern.compile("topic-.*")); 
 
- assign订阅指定分区,参数为- TopicPartition的集合。另外,这种消费方式不能自动 rebanlance。- 1 - public void assign(Collection<TopicPartition> partitions) - 先看以下 - TopicPartition类。主要属性就两个,一个- topic,一个- partition。作用就是映射- topic-partition。- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20- public final class TopicPartition implements Serializable { 
 private int hash = 0;
 private final int partition;
 private final String topic;
 public TopicPartition(String topic, int partition) {
 this.partition = partition;
 this.topic = topic;
 }
 public int partition() {
 return partition;
 }
 public String topic() {
 return topic;
 }
 ...
 }- 例子如下: - kafkaConsumer.assign(Arrays.asList(new TopicPartition("topic",0)));
kafkaConsumer  取消订阅
- 几种取消的方式,效果一样。 - 1 
 2
 3- kafkaConsumer.unsubscribe(); 
 kafkaConsumer.subscribe(Arrays.asList());
 kafkaConsumer.assign(new ArrayList<TopicPartition>());- 如果取消了订阅,那么程序会抛异常 : - java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
此外,三种订阅方式(基于集合、基于正则和指定分区的)都会使消费者具有不同的状态,且互斥。因此同一个消费者只能使用其中的一种。
消费者拉取消息进行消费
一般消息队列的消费有两种模式:推模式和拉模式。见名知意,推模式就是服务端主动将消息推送给消费者,拉模式就是消费者主动向服务端发起拉取消息的请求。而 Kafka 就是基于拉模式的,通过上面的 demo 代码可见,kafka 中消费者消费是一个持续循环的 poll 的过程,poll() 方法返回的就是所订阅的主题 (分区) 的上一组消息,具体为 ConsumerRecords。然后轮询 ConsumerRecords 就可以对每一条消息 ConsumerRecord 进行处理。 
| 1 | while (true){ | 
ConsumerRecord 的属性有很多
| 1 | public class ConsumerRecord<K, V> { | 
消费者提交 offset
在上面消费消息时,每次 poll 的数据都是未被消费过的数据,要做到标记未被消费过,就需要每次记录上一次消费后的消费的 offset,并且这个记录需要持久化保存,因为存在缓存或者内存可能会丢失。另外,在 Consumer Group 成员变动时,对于同一分区,可能在发生 rebanlance 之后,被不同消费者消费了,也需要持久化记录消费的 offset。
注意!!提交的 offset 是下次拉取的 offset ,而不是现在消费完的 offset 。
当然,在 Kafka 中也有不同的 offset 提交方式,有默认的自动提交,也有显示的手动提交。手动提交又分同步和异步两种:
- 自动提交 - offset:- Kafka周期性的自动提交 offset。- enable.auto.commit = true
- 同步提交 - offset:- enable.auto.commit = false每次提交 offset 时,会阻塞线程直到 offset 提交成功才能进行下一次- poll消息。无参的- commitSync()是每次- poll的批量提交,带参数 (TP 和- offset的 Map) 的是针对每条或每个分区的消息的- offset进行提交,如果是每条消息进行提交,那么性能将最低;如果是每个分区提交,那么可以类似这样子做:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25- try { 
 // 持续不断消费数据
 while (true){
 // 每隔 1 秒拉取一批数据
 ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
 // 遍历分区
 for (TopicPartition tp:records.partitions()){
 // 获取每个分区的消息
 List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
 // 遍历每个分区的消息集合
 for (ConsumerRecord<String, String> record:tpRecords){
 // 业务逻辑处理
 LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
 }
 // 同步提交每个分区的 offset
 // tpRecords.get(tpRecords.size()-1).offset() 最后一条消息的offset
 kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata( tpRecords.get(tpRecords.size()-1).offset() + 1)));
 }
 }
 } catch (Exception e) {
 LOG.error("", e);
 } finally {
 // 关闭消费者
 kafkaConsumer.close();
 }
- 异步提交 - offset:- enable.auto.commit = false和同步的相反,异步提交- offset时,不会阻塞消费,可能还没提交成功就已经拉取完下一批数据了。异步提交可以一定程度的提高性能。异步提交有三个重载方法:- 1 
 2
 3- public void commitAsync() 
 public void commitAsync(OffsetCommitCallback callback)
 public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)- 第一个和第三个中的 - offset和同步提交的一样,不一样的就是第二个和第三个的- OffsetCommitCallback。异步和同步最大的不同就是,有可能前一批- offset提交失败,但是后一批提交成功,这样子容易造成重复消费。如下代码,- OffsetCommitCallback类,在提交完成之后会回调其中的- onComplete()方法,我们可以在这个方法里面进行判断成功或者失败,然后进行相应的处理。假设提交失败的话,如果需要进行重试,那么要注意判断此次提交的 offset 和已经提交成功的 offset 的大小,然后才能知道是否需要提交- offset(如果后一批提交成功,那么已经提交成功的 offset 就肯定会比前一批提交失败的- offset大)。- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24- while (true){ 
 // 每隔 1 秒拉取一批数据
 ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
 // 遍历拉取的消息集
 for (TopicPartition tp:records.partitions()){
 List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
 for (ConsumerRecord<String, String> record:tpRecords){
 LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
 }
 // 异步提交每个分区的 offset
 // tpRecords.get(tpRecords.size()-1).offset() 最后一条消息的offset
 kafkaConsumer.commitAsync( Collections.singletonMap(tp, new OffsetAndMetadata(tpRecords.get(tpRecords.size() - 1).offset() + 1)),
 new OffsetCommitCallback() {
 
 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
 if (exception == null) {
 LOG.info("offset 提交成功:" + offsets);
 } else {
 LOG.error("kafka commit offset failure!!");
 }
 }
 });
 }
 }
关闭消费者实例
正常情况下,消费者是一直都在运行的,但是如果我们需要暂定或者关闭消费者,就需要考虑怎么退出消费的循环了。如果是暂定,则需要考虑如何恢复。
先看简单的暂定和恢复,KafkaConsumer 提供了 pause() 和 resume() 来暂停和恢复 kafkaConsumer 对某些分区的消费。还有一个无参的 paused() 的方法来返回已经暂定的方法。
| 1 | public void pause(Collection<TopicPartition> partitions) // 暂定某些分区的消费 | 
然后,如何优雅的退出 poll 的那层循环呢。一般来说有两种方式:
- 开始 poll的那个循环不用试固定的true,而是做成一个能开关的全局变量,这样子就能在其他地方也可以直接中断循环
- 直接调用 Kafkaconsumer的wakeup()方法。该方法会退出poll的循环并抛出一个wakeup的异常
在退出循环之后,需要显示的关闭 KafkaConsumer,以释放资源。关闭实例有三个重载方法:  
| 1 | kafkaConsumer.close(); // 默认 30 秒,也是调用的后面的方法 |