消费者和消费组

消费

消费者和消费组

对于每一个消费者 Consumer 而言,都有其对应的消费组 Consumer Group。每条消息都只会发送到每个消费组中的一个消费者。消费者数量不应该大于 Topic 分区数量,否则会有消费者消费不到消息的情况。

客户端开发

和生产者类似,一个完整的消费逻辑如下:

  1. 配置消费者 Consumer 参数以及创建相应的消费者实例
  2. 消费者订阅 Topic
  3. 消费者拉取消息进行消费
  4. 消费者提交 offset
  5. 关闭消费者实例

先来看一个简单的 Consumer demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {

private static final Logger LOG = LoggerFactory.getLogger(MyConsumer.class);

// 参数初始化
private static Properties initConfig(String brokerList, String groupID, String clientID){

Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // key 反序列化
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value 反序列化
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList); // Kafka 服务
prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); // Consumer Group
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, clientID); // client id

return prop;
}

public static void main(String[] args){

// consumer基本信息
String topic = "TEST-TOPIC";
String brokerList = "tnode1:9092";
String groupID = "group.demo";
String clientID = "consumer.client.id.demo";

// 初始化配置
Properties prop = initConfig(brokerList, groupID, clientID);
// 生产 Consumer 实例
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(prop);
// 订阅指定的 TOPIC,可以多个,组装成 List 就行。
kafkaConsumer.subscribe(Arrays.asList(topic));

try {
// 持续不断消费数据
while (true){
// 每隔 1 秒拉取一批数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
// 遍历拉取的消息集
for (ConsumerRecord<String, String> record:records){
// 打印每条消息的topic、partition、offset、key、value
LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
}
}
} catch (Exception e){
LOG.error("", e);
} finally {
// 关闭消费者
kafkaConsumer.close();
}
}
}
配置消费者 Consumer 参数以及创建相应的消费者实例

参考上面代码,类比 生产者 Producer 的配置,这边配置也类似,只不过多了一个 group idConsumer Group 的配置,其他配置都类似,不再赘述。

额外提一点,KafkaProducerKafkaConsumer 都有两个构造方法,第一个都是直接加载 参数的,即我们代码中使用的;第二个是 加载参数和 序列化反序列化器的。因为我们一般都在参数中设置序列化和反序列化器,所以常用的构造方法都是第一种,第二种不怎么用。

1
2
3
4
5
6
public KafkaConsumer(Map<String, Object> configs)
public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
public KafkaConsumer(Properties properties)
public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
// 最终的调用
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
消费者订阅 Topic

kafkaConsumer 订阅消息有两种类型:

  • subscribe 直接订阅 Topic,可直接指定 Topic 集合,也可使用正则表达式。如果不指定 ReBalanceListener ,则使用默认的 ReBalanceListener

    1
    2
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
    1. 对于使用集合方式订阅 topic 的,”后发制人”,即最后订阅有效,前面的不生效。如

      1
      2
      kafkaConsumer.subscribe(Arrays.asList("topic1"));
      kafkaConsumer.subscribe(Arrays.asList("topic2"));

      最终消费订阅的是 topic2

    2. 对于使用 正则表达式的 方式订阅 topic的,符合正则表达式的所有 topic 都会被消费,无论此 topic是在消费程序启动之前创建还是消费程序启动之后创建的。前期的版本,使用正则时,必须显示指定ReBalanceListener

      1
      kafkaConsumer.subscribe(Pattern.compile("topic-.*"));
  • assign 订阅指定分区,参数为 TopicPartition 的集合。另外,这种消费方式不能自动 rebanlance

    1
    public void assign(Collection<TopicPartition> partitions)

    先看以下 TopicPartition 类。主要属性就两个,一个 topic,一个 partition。作用就是映射 topic-partition

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public final class TopicPartition implements Serializable {

    private int hash = 0;
    private final int partition;
    private final String topic;

    public TopicPartition(String topic, int partition) {
    this.partition = partition;
    this.topic = topic;
    }

    public int partition() {
    return partition;
    }

    public String topic() {
    return topic;
    }
    ...

    例子如下:kafkaConsumer.assign(Arrays.asList(new TopicPartition("topic",0)));

kafkaConsumer 取消订阅

  1. 几种取消的方式,效果一样。

    1
    2
    3
    kafkaConsumer.unsubscribe();
    kafkaConsumer.subscribe(Arrays.asList());
    kafkaConsumer.assign(new ArrayList<TopicPartition>());

    如果取消了订阅,那么程序会抛异常 :

    java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

此外,三种订阅方式(基于集合、基于正则和指定分区的)都会使消费者具有不同的状态,且互斥。因此同一个消费者只能使用其中的一种。

消费者拉取消息进行消费

一般消息队列的消费有两种模式:推模式和拉模式。见名知意,推模式就是服务端主动将消息推送给消费者,拉模式就是消费者主动向服务端发起拉取消息的请求。而 Kafka 就是基于拉模式的,通过上面的 demo 代码可见,kafka 中消费者消费是一个持续循环的 poll 的过程,poll() 方法返回的就是所订阅的主题 (分区) 的上一组消息,具体为 ConsumerRecords。然后轮询 ConsumerRecords 就可以对每一条消息 ConsumerRecord 进行处理。

1
2
3
4
5
6
7
8
9
while (true){
// 每隔 1 秒拉取一批数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
// 遍历拉取的消息集
for (ConsumerRecord<String, String> record:records){
// 打印每条消息的topic、partition、offset、key、value
LOG.info(record.topic() + ":" + record.partition() + ":" + record.offset() + " ::: " + record.key() + ":" + record.value());
}
}

ConsumerRecord 的属性有很多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;

private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private final K key;
private final V value;
... // 省略其他方法
}
消费者提交 offset

在上面消费消息时,每次 poll 的数据都是未被消费过的数据,要做到标记未被消费过,就需要每次记录上一次消费后的消费的 offset,并且这个记录需要持久化保存,因为存在缓存或者内存可能会丢失。另外,在 Consumer Group 成员变动时,对于同一分区,可能在发生 rebanlance 之后,被不同消费者消费了,也需要持久化记录消费的 offset

注意!!提交的 offset 是下次拉取的 offset ,而不是现在消费完的 offset 。

当然,在 Kafka 中也有不同的 offset 提交方式,有默认的自动提交,也有显示的手动提交。手动提交又分同步和异步两种:

  • 自动提交 offsetKafka 周期性的自动提交 offset。enable.auto.commit = true

  • 同步提交 offsetenable.auto.commit = false 每次提交 offset 时,会阻塞线程直到 offset 提交成功才能进行下一次 poll 消息。无参的 commitSync() 是每次 poll 的批量提交,带参数 (TP 和 offset 的 Map) 的是针对每条或每个分区的消息的 offset 进行提交,如果是每条消息进行提交,那么性能将最低;如果是每个分区提交,那么可以类似这样子做:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    try {
    // 持续不断消费数据
    while (true){
    // 每隔 1 秒拉取一批数据
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    // 遍历分区
    for (TopicPartition tp:records.partitions()){
    // 获取每个分区的消息
    List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
    // 遍历每个分区的消息集合
    for (ConsumerRecord<String, String> record:tpRecords){
    // 业务逻辑处理
    LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
    }
    // 同步提交每个分区的 offset
    // tpRecords.get(tpRecords.size()-1).offset() 最后一条消息的offset
    kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata( tpRecords.get(tpRecords.size()-1).offset() + 1)));
    }
    }
    } catch (Exception e) {
    LOG.error("", e);
    } finally {
    // 关闭消费者
    kafkaConsumer.close();
    }
  • 异步提交 offsetenable.auto.commit = false 和同步的相反,异步提交 offset 时,不会阻塞消费,可能还没提交成功就已经拉取完下一批数据了。异步提交可以一定程度的提高性能。异步提交有三个重载方法:

    1
    2
    3
    public void commitAsync()
    public void commitAsync(OffsetCommitCallback callback)
    public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

    第一个和第三个中的 offset 和同步提交的一样,不一样的就是第二个和第三个的 OffsetCommitCallback。异步和同步最大的不同就是,有可能前一批 offset 提交失败,但是后一批提交成功,这样子容易造成重复消费。如下代码,OffsetCommitCallback 类,在提交完成之后会回调其中的 onComplete() 方法,我们可以在这个方法里面进行判断成功或者失败,然后进行相应的处理。假设提交失败的话,如果需要进行重试,那么要注意判断此次提交的 offset 和已经提交成功的 offset 的大小,然后才能知道是否需要提交 offset (如果后一批提交成功,那么已经提交成功的 offset 就肯定会比前一批提交失败的 offset 大)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    while (true){
    // 每隔 1 秒拉取一批数据
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    // 遍历拉取的消息集
    for (TopicPartition tp:records.partitions()){
    List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
    for (ConsumerRecord<String, String> record:tpRecords){
    LOG.info(record.topic()+":"+record.partition()+":"+record.offset()+" ::: "+record.key()+":"+record.value());
    }
    // 异步提交每个分区的 offset
    // tpRecords.get(tpRecords.size()-1).offset() 最后一条消息的offset
    kafkaConsumer.commitAsync( Collections.singletonMap(tp, new OffsetAndMetadata(tpRecords.get(tpRecords.size() - 1).offset() + 1)),
    new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    if (exception == null) {
    LOG.info("offset 提交成功:" + offsets);
    } else {
    LOG.error("kafka commit offset failure!!");
    }
    }
    });
    }
    }
关闭消费者实例

正常情况下,消费者是一直都在运行的,但是如果我们需要暂定或者关闭消费者,就需要考虑怎么退出消费的循环了。如果是暂定,则需要考虑如何恢复。

先看简单的暂定和恢复,KafkaConsumer 提供了 pause()resume() 来暂停和恢复 kafkaConsumer 对某些分区的消费。还有一个无参的 paused() 的方法来返回已经暂定的方法。

1
2
3
public void pause(Collection<TopicPartition> partitions)   // 暂定某些分区的消费
public void resume(Collection<TopicPartition> partitions) // 恢复某些已经暂定的分区的消费
public Set<TopicPartition> paused() // 返回已经暂定的分区列表

然后,如何优雅的退出 poll 的那层循环呢。一般来说有两种方式:

  • 开始 poll 的那个循环不用试固定的 true,而是做成一个能开关的全局变量,这样子就能在其他地方也可以直接中断循环
  • 直接调用 Kafkaconsumerwakeup() 方法。该方法会退出 poll 的循环并抛出一个 wakeup 的异常

在退出循环之后,需要显示的关闭 KafkaConsumer,以释放资源。关闭实例有三个重载方法:

1
2
3
kafkaConsumer.close();     // 默认 30 秒,也是调用的后面的方法
kafkaConsumer.close(30000, TimeUnit.MILLISECONDS); // 因为第三种方式的出现,现已经取消。
kafkaConsumer.close(Duration.ofMillis(30000));