KafkaConsumer一些概念解释(从官方文档整理而来)

2023-12-13 14:38

本文主要是介绍KafkaConsumer一些概念解释(从官方文档整理而来),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

阅读之前假设您已经对kafka有了一定的了解

详情和API请参考http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


KafkaConsumer类简介

public class KafkaConsumer<K,V>extends Object implements Consumer<K,V> 
(从官方文档copy而来)

KafkaConsumer是kafka服务的一个java版本的客户端。它会自动处理kafka集群中出现的错误,自动适应kafka集群中数据分区的迁移。它还可以以消费群组(consumer groups)的方式同服务器交互,从而实现消息处理负载均衡( load balance consumption)。 
消费者(consumer )跟代理服务器(broker)之间保持着一个TCP连接来获取数据。如果在使用完consumer之后没有调用close()方法来关闭的话,就会导致连接泄露。KafkaConsumer不是线程安全的,切记。(Producer类是线程安全的,通常我们将Producer以单例的模式实现)


偏移量(Offsets )和消费者位置(Consumer Position)

简短解说,偏移量就相当于数据库中的ID,是一个唯一标识符,代表着一条消息在一个主题(topic)的某个分区(partition)中的位置,消费者位置就是某个订阅该主题的消费者在它所使用的那个主题分区中的位置,也就是它目前处理消息的位置。数据库游标知道吧,消息偏移量就相当于数据库中的数据ID,而消费者位置就相当于数据库游标位置。如果消费者不小心挂掉了,再重启还是会从当前消费者位置来读取数据,这就是Consumer Position的作用。


消费者群组(Consumer Groups)和主题订阅(Topic Subscriptions)

kafka使用消费者群组(Consumer Groups)的概念来分割处理消息的工作。一个consumer group中可以有多个消费者,这多个消费者可以在同一台机器上运行,也可以在不同的机器上运行。这样,就有了一定的扩展性和容错功能。 
每个kafka的consumer都可以配置一个群组名(consumer group),并且可以通过subscribe()方法动态设置想要订阅的topic列表。kafka服务器将把每个topic中的每条消息都发送给订阅它的群组,每个群组中只会有一个consumer来处理这条消息。具体的实现机制就是kafka服务器将每个topic的每个partition都分配给订阅它的群组中的一个consumer,从而实现并发处理和负载均衡。简短解说,我们将概念抽象出来,把topic抽象成一包糖,partition相当于一个一个的糖豆,一个consumer group抽象成一堆熊孩子,那一个consumer就是这堆熊孩子中的一个熊孩子了。假设现在这包糖有90个,这堆熊孩子有30个,那每个熊孩子能飞到3个糖果。假设这包糖有89个,熊孩子还是30个,你该说分不匀了,这就不是你管的事了,kafka服务器会去决定哪个熊孩子倒霉,少分一个。这是在一对一的情况下,将这个概念延伸开来,在多对一,一对多,多对多的情况下也是适用的。你只需要将每包糖果看成彼此独立,每堆熊孩子彼此独立,每包糖果对每堆熊孩子之间都相互独立,互不影响应该就能理解。


kafka服务器检测consumer的失效(Detecting Consumer Failures)

当一个consumer订阅一个topic,在该consumer调用poll(long)方法后就会自动加入它所属于的那个群组。poll(long)方法在设计上可以维持该consumer的活性,只要该consumer持续调用poll(方法)方法。表象之下,poll(long)方法在每次被调用的时候都会向kafka服务器发送一个心跳(heartbeat)来告诉kafka服务器自己依然健在。如果你停止调用poll(long)方法(可能是因为异常导致程序挂掉了),那consumer就不会再向kafka服务器发送heartbeat,然后过一段时间,服务器就会认为该consumer挂掉了,然后就会被踢出consumer所属的群组,然后本来被分配给该consumer的partition就会被重新分配就其他的consumer(就相当于是某个熊孩子不小心把自己玩死了,然后他的糖果就会被拿回去,重新分给其他的熊孩子)。这样设计是为了防止某些consumer挂掉之后依旧握着partition不松手,导致某些消息无法被其他健在的consumer处理的情况发生。 
在单线程情况下,这种设计就要求consumer处理接受到的消息的时间要小于调用poll(long)的周期,从而保证heartbeat的正常发送,从而让服务器知道自己依然健在。这里有一个session timeout的概念,session timeout就是consumer发送两次有效heartbeat的最长时间间隔,严格来说就是在不超过多长的时间内,你让服务器接收到heartbeat,从而确定你的consumer的活性。如果你接收到了消息,然后用来处理这些消息的时间过长,从而导致无法调用poll(long)而无法发送heartbeat,那在session timeout之后,服务器就会认为你的consumer挂掉了。如果服务器认为你的consumer挂掉了,那你consumer相应的partition就会被kafka服务器的负载均衡机制给均衡掉,重新分配给其他的consumer。 
KafkaConsumer类有两项配置可以控制这种行为: 
1、session.timeout.ms:从名称就可以看出来,就是heartbeat超时时间,增加该设置值可以给consumer更多处理poll(long)返回的消息的时间。唯一的缺点是如果你的consumer不小心玩脱挂掉了,服务器可能不能及时检测到,这就会导致服务器不能及时根据你consumer的情况进行负载均衡。如果你调用close()方法来告知服务器你的consumer要退出了,服务器会及时进行负载均衡,这种情况不受该设置的影响。 
2、max.poll.records:意思也显而易见,就是每次调用poll(long)方法的时候,最多返回多少条消息记录。消息处理时间通常跟要处理的消息记录的条数是成比例的,所以通常人们希望在每次调用poll(long)的返回条数上做限制。默认情况下,该设置的值为无限制(no limit)。


使用示例

自动提交位置
     /*从官方文档copy而来*/Properties props = new Properties();/*配置broker*/props.put("bootstrap.servers", "localhost:9092");/*配置group id*/props.put("group.id", "test");/*配置自动提交位置*/props.put("enable.auto.commit", "true");/*配置自动提交的时间,以毫秒为单位*/props.put("auto.commit.interval.ms", "1000");/*配置session timeout时间,以毫秒为单位*/props.put("session.timeout.ms", "30000");/*这两个deserializer一般不要动,直接拿来用就行了*/props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*创建consumer*/KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);/*配置consumer订阅的主题,这里用 foo 和 bar 做为例子*/consumer.subscribe(Arrays.asList("foo", "bar"));/*一般我们在一个死循环里调用poll(long)和处理消息*/while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
手动提交位置
     /*从官方文档copy而来*/Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");/*这里关掉自动提交*/props.put("enable.auto.commit", "false");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));/*配置一个限制,当消息数量达到这个限制,我们处理消息*/final int minBatchSize = 200;/*消息缓存链表*/List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {/*调用poll(long)来获取消息数据*/ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {/*达到限制,开始处理消息*/insertIntoDb(buffer);/*处理消息后,用同步方法提交consumer position,也就是消费者位置*/consumer.commitSync();buffer.clear();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

上面的例子是在所有的消息都成功处理完之后一次性提交所有consumer所关联的分区位置,我们还可以更进一步,更细化的控制位置提交的时机,比如我们可以一个分区一个分区的来处理消息,然后每处理完一个分区的消息,我们就提交一下consumer在当前分区的位置。代码如下:

/*从官方文档copy而来*/try {while(running) {/*获取消息数据*/ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);/*遍历消息数据中有关的分区*/for (TopicPartition partition : records.partitions()) {/*取出当前消息分区中的消息*/List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);/*处理消息*/for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}/*计算当前位置*/long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();/*用同步方法提交当前位置*/consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {/*最后别忘了关闭consumer*/consumer.close();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

手动控制分区分配

在之前的例子中,我们只是订阅了我们感兴趣的topic,然后kafka服务器会自动为我们的consumer分配topic下的partition。然而某些情况下我们可能需要手动配置我们的consumer所要使用的partition,例如我们想要重新获取以前已经使用过的消息,我们就需要手动来配置partition。 
如果想要手动配置分区,就不能再调用subscribe()方法,需要调用assign(Collection)来配置,Collection表示所有想要配置的分区的集合。示例代码如下:

     /*代码是从官方文档copy而来*//*想要订阅的topic*/String topic = "foo";/*想要配置的分区们*/TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);/*配置分区*/consumer.assign(Arrays.asList(partition0, partition1));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配置完分区之后,具体的使用就跟前面介绍过的一样了。如果想要更换分区,只需重新调用assign()方法就行了。手动配置的分区是没有consumer group的自动负载均衡功能的,所以如果你的consumer挂掉了,并不会引起群组的负载均衡,也就没有其他的consumer自动接管你的consumer的作用,那么消息就不能被该群组处理了。同时,如果一个群组中有多个consumer分配了同一个topic下的同一个分区,那么可能会导致consumer position的commit问题,可能一个consumer提交了一个靠前的位置,而两一个consumer随后提交了一个靠后的位置,从而导致消息重复。为了避免这种冲突,你应该确保使用手动分配partition的群组只有一个consumer,同时这个consumer要分配它所订阅的topic下的所有partition来接受所有的消息。这样,consumer就可以安全的读取任意partition的任意位置的消息了。

注意:

kafka不支持将手动分区分配和自动动态分区分配混合使用,也就是说如果你的群组中有一个consumer是手动分配,则其他的都会成为手动分配,所以建议手动分配的consumer group只配置一个consumer。


控制consumer分配的partition

假设你使用了手动配置分区,且你的consumer group中只有一个consumer,这时你就可以调用seek(TopicPartition,long)方法来读取任意位置的消息了。

转载自  https://blog.csdn.net/lianjunzongsiling/article/details/52622864

这篇关于KafkaConsumer一些概念解释(从官方文档整理而来)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/488829

相关文章

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

MySQL复杂SQL之多表联查/子查询详细介绍(最新整理)

《MySQL复杂SQL之多表联查/子查询详细介绍(最新整理)》掌握多表联查(INNERJOIN,LEFTJOIN,RIGHTJOIN,FULLJOIN)和子查询(标量、列、行、表子查询、相关/非相关、... 目录第一部分:多表联查 (JOIN Operations)1. 连接的类型 (JOIN Types)

C#实现将Office文档(Word/Excel/PDF/PPT)转为Markdown格式

《C#实现将Office文档(Word/Excel/PDF/PPT)转为Markdown格式》Markdown凭借简洁的语法、优良的可读性,以及对版本控制系统的高度兼容性,逐渐成为最受欢迎的文档格式... 目录为什么要将文档转换为 Markdown 格式使用工具将 Word 文档转换为 Markdown(.

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python实现自动化Word文档样式复制与内容生成

《Python实现自动化Word文档样式复制与内容生成》在办公自动化领域,高效处理Word文档的样式和内容复制是一个常见需求,本文将展示如何利用Python的python-docx库实现... 目录一、为什么需要自动化 Word 文档处理二、核心功能实现:样式与表格的深度复制1. 表格复制(含样式与内容)2

Maven项目中集成数据库文档生成工具的操作步骤

《Maven项目中集成数据库文档生成工具的操作步骤》在Maven项目中,可以通过集成数据库文档生成工具来自动生成数据库文档,本文为大家整理了使用screw-maven-plugin(推荐)的完... 目录1. 添加插件配置到 pom.XML2. 配置数据库信息3. 执行生成命令4. 高级配置选项5. 注意事

MySQL 事务的概念及ACID属性和使用详解

《MySQL事务的概念及ACID属性和使用详解》MySQL通过多线程实现存储工作,因此在并发访问场景中,事务确保了数据操作的一致性和可靠性,下面通过本文给大家介绍MySQL事务的概念及ACID属性和... 目录一、什么是事务二、事务的属性及使用2.1 事务的 ACID 属性2.2 为什么存在事务2.3 事务

Python使用python-docx实现自动化处理Word文档

《Python使用python-docx实现自动化处理Word文档》这篇文章主要为大家展示了Python如何通过代码实现段落样式复制,HTML表格转Word表格以及动态生成可定制化模板的功能,感兴趣的... 目录一、引言二、核心功能模块解析1. 段落样式与图片复制2. html表格转Word表格3. 模板生

JAVA数组中五种常见排序方法整理汇总

《JAVA数组中五种常见排序方法整理汇总》本文给大家分享五种常用的Java数组排序方法整理,每种方法结合示例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录前言:法一:Arrays.sort()法二:冒泡排序法三:选择排序法四:反转排序法五:直接插入排序前言:几种常用的Java数组排序

Spring Boot 常用注解整理(最全收藏版)

《SpringBoot常用注解整理(最全收藏版)》本文系统整理了常用的Spring/SpringBoot注解,按照功能分类进行介绍,每个注解都会涵盖其含义、提供来源、应用场景以及代码示例,帮助开发... 目录Spring & Spring Boot 常用注解整理一、Spring Boot 核心注解二、Spr