kafka的offset存储位置以及offset的提交方式

2024-05-29 07:48

本文主要是介绍kafka的offset存储位置以及offset的提交方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 offset的存储位置

1.1 存储位置

1.从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的topic 中,该 topic __consumer_offsets
2. Kafka0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中。

 

__consumer_offsets 主题里面采 用 key 和 value 的方式存储数据 key 是 group.id+topic+
分区号 value 就是当前 offset 的值。每隔一段时间, kafka 内部会对这个 topic 进行
compact ,也就是每个 group.id+topic+分区号就保留最新数据

1.2 消费offset案例

1.首先在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false

默认是 true ,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
2. 采用命令行方式,创建一个新的 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --
replication-factor 2
3.启动生产者往 atguigu 生产数据
bin/kafka-console-producer.sh --topic  atguigu --bootstrap-server hadoop102:9092
4.消费数据
bin/kafka-console-consumer.sh  -- bootstrap-server hadoop102:9092 --topic atguigu --group test
注意:指定消费者组名称,更好观察数据存储位置( key group.id+topic+ 分区号)。
5. 查看消费者消费主题 __consumer_offsets

 二  offset的提交方式

2.1 自动提交方式

为了使我们能够专注于自己的业务逻辑, Kafka提供了自动提交offset的功能。
自动提交 offset 的相关参数:
enable.auto.commit 是否开启自动提交 offset 功能,默认是 true
auto.commit.interval.ms 自动提交 offset 的时间间隔,默认是 5s

 

 2.1.1 代码部分设置

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
// 提交 offset 的时间周期 1000ms ,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);

2.2 手动提交方式

虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset API
手动提交 offset 的方法有两种:分别是 commitSync (同步提交) commitAsync (异步提交) 。两者的相 同点是,都会将 本次提交的一批数据最高的偏移量提交 ;不同点是, 同步提交阻塞当前线程 ,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 异步提交则没有失败重试机制,故 有可能提交失败。
commitSync (同步提交):必须等待 offset提交完毕,再去消费下一批数据 。并且会自动失败重试
commitAsync (异步提交) :发送完提交 offset请求后,就开始消费下一批数据了。没有失败重试机制

2.2.1 同步提交

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交 offset 的示例

 

 2.3.2 异步提交

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。 因此更多的情况下,会选用异步提交 offset 的方式。

 

这篇关于kafka的offset存储位置以及offset的提交方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013065

相关文章

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

python判断文件是否存在常用的几种方式

《python判断文件是否存在常用的几种方式》在Python中我们在读写文件之前,首先要做的事情就是判断文件是否存在,否则很容易发生错误的情况,:本文主要介绍python判断文件是否存在常用的几种... 目录1. 使用 os.path.exists()2. 使用 os.path.isfile()3. 使用

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

Mybatis的分页实现方式

《Mybatis的分页实现方式》MyBatis的分页实现方式主要有以下几种,每种方式适用于不同的场景,且在性能、灵活性和代码侵入性上有所差异,对Mybatis的分页实现方式感兴趣的朋友一起看看吧... 目录​1. 原生 SQL 分页(物理分页)​​2. RowBounds 分页(逻辑分页)​​3. Page

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

RedisTemplate默认序列化方式显示中文乱码的解决

《RedisTemplate默认序列化方式显示中文乱码的解决》本文主要介绍了SpringDataRedis默认使用JdkSerializationRedisSerializer导致数据乱码,文中通过示... 目录1. 问题原因2. 解决方案3. 配置类示例4. 配置说明5. 使用示例6. 验证存储结果7.

Python程序打包exe,单文件和多文件方式

《Python程序打包exe,单文件和多文件方式》:本文主要介绍Python程序打包exe,单文件和多文件方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录python 脚本打成exe文件安装Pyinstaller准备一个ico图标打包方式一(适用于文件较少的程

Python验证码识别方式(使用pytesseract库)

《Python验证码识别方式(使用pytesseract库)》:本文主要介绍Python验证码识别方式(使用pytesseract库),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录1、安装Tesseract-OCR2、在python中使用3、本地图片识别4、结合playwrigh