您的位置:首页 > 其它

(四)Kafka 学习笔记之消费者

2019-03-31 18:00 423 查看

Consumer 实践

开发 Consumer 程序与 Producer 程序不同,Consumer 程序涉及到另外一些重要的概念,理解这些概念是开发稳定的 Consumer 程序的前提。

ConsumerGroup(消费组)

在 Producer 发送消息的频率不大的情况下,Consumer 可以及时读取 Broker 消息,并处理消息。但是如果 Producer 发送消息的频率很快,Consumer 无法及时处理,此时就会造成消息积压,进而产生了时延。随着时间的推移,积压的消息越来越多。此时,就需要扩大消费者的规模。就像多个Producer 可以向同一个 Topic 写入消息一样,多个 Consumer 也可以读取同一个 Topic 消息。每个 Consumer 只可以读取一部分消息,而不能重复读取其他 Consumer 已读取的消息。

在 Kafka 中,一个 Consumer 属于一个消费者组(Consumer Group,CG)。当多个 Consumer 属于同一消费者组,此时读取一个 Topic 中的消息,消费者组中的消费者可以读取某一个(或多个) Partition 中的消息。

即:消费者组中的消费者可以读取一个或多个 Topic 中的 partition 消息,但是一个消费者组中的消费者不可以处理其它消费者已经处理过的消息。

所以:如果一个 Topic 接收到的消息量很大,那么最好多创建几个 partition,以提高消息的吞吐量。

Consumer 开发步骤

创建 KafkaConsumer 对象

类似于创建 KafkaProducer 对象。KafkaConsumer 的创建也需要:

  1. 创建一个 Properties 对象,,用于设置需要传入的 KafkaConsumer 对象的配置参数(配置参数有是四个是必须存在的:bootstrap.servers、key.deserializer、value.deserializer 和一个重要参数 group.id)。
  2. 传入参数,创建 KafkaConsumer
  3. 订阅 Topic,并 poll。
    http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ConsumerGroup,当前Consumer属于哪个ConsumerGroup。
props.put("group.id", "test");
// 是否自动提交offset,默认为true。
props.put("enable.auto.commit", "true");
// 自动提交offset的事件间隔。
props.put("auto.commit.interval.ms", "1000");
// zk 会话超时。
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 Topic
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
// 获取消息,其使用ConsumerRecord来封装的。
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

Consumer 消费消息的三种语义

当一个 Consumer Group 首次创建时(即 Consumer 首次消费消息) ,消费消息的位置 offset 的初始值由 auto.offset.reset 参数设置。一旦消费者开始读取消息,其会根据应用程序业务逻辑的需求,定期提交 offset(enable.auto.commit,auto.commit.interval.ms)。一个消费者组中的消费者消费一个 partition 中的消息,消费到哪个 offset 了,都会根据最后一次提交的 offset 确定。

如果一个 ConsumerA 在消费消息后,在提交 offset 之前 down 掉了,此时另一个 consumerB 在接替 ConsumerA 的工作时,就会重复消费部分 ConsumerA 已经消费过的消息,因为 ConsumerA 最后消费的消息的 offset 没有提交到 Broker。所以提交 offset 的操作越频繁,重复消费消息的可能性越小。

如果要 KafkaConsumer 手动提交 offset,那么首先禁用自动提交:props.put(“enable.auto.commit”, “false”);

根据提交 offset 的时间不同,产生了三种消费语义:

  • 至少一个(at-least-once):先消费,再提交。
  • 至多一个(at-most-once):先提交,再消费。
  • 有且仅有一次(Exactly-once):边消费边 offset 持久化,当 Consumer down掉了,Broker 可以读取持久化的 offset 信息来确定 offset。

至少一次 (at-least-once)

至少一次: 是先消费,后提交 offset。因此,消息会存在重复消费的情况。如 ConsumerA 在消费一个 Topic 中的 partition 消息时,在处理完消息时,恰好 down 掉了,此时 offset 还未被提交,那么ConsumerB 在接替 ConsumerA 消费时,只能重复消费部分消息(前一次已经提交了 offset 开始到这次还未提交 offset 的消息)。所以出现了至少一次的消费消息的情况。

实现过程:

  1. 设置 enable.auto.commit 为 false;
  2. 消息处理(process())完之后调用 consumer.commitSync();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {}
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
process();    //先消费消息
consumer.commitSync();  // 再提交offset
}

当然至少一次的实现方式不仅仅只有一种,可以看:
一文精通 kafka 消费者的三种语义
Kafka client 消息接收的三种模式

至多一次(at-most-once)

最多一次:是在获取数据后,先提交 offset,然后再消费处理数据。因此消息可能存在丢失。例如:当 ConsumerA 在消费消息时,如果提交了 offset,恰好 ConsumerA down 掉了,此消息未被消费。但是先 ConsumerB 只能从当前的 offset 消费消息。因此就存在一部分消息的丢失。也就出现了消息可能只消费一次,或者一次都不会被消费的情况。

实现过程:

  1. 设置 enable.auto.commit 为 false;
  2. 消息处理(process())完之前调用 consumer.commitSync();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {}
ConsumerRecords<String, String> records = consumer.poll(100);
consumer.commitSync(); //先提交offset
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
process();   //再消费消息
}

当然至多一次的实现方式不仅仅只有一种,比如可以通过:

  1. 设置 enable.auto.commit 为 true;
  2. auto.commit.interval.ms 设置一个较大的事件。

具体可以看:
一文精通 kafka 消费者的三种语义
Kafka client 消息接收的三种模式

有且仅有一次(Exactly-once)

有且仅有一次:说明当前消息仅仅会被消费一次。具体实现可以参看
一文精通 kafka 消费者的三种语义
Kafka client 消息接收的三种模式

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: