kafka的offset存储位置以及offset的提交方式

2024-05-29 07:48

本文主要是介绍kafka的offset存储位置以及offset的提交方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 offset的存储位置

1.1 存储位置

1.从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的topic 中,该 topic __consumer_offsets
2. Kafka0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中。

 

__consumer_offsets 主题里面采 用 key 和 value 的方式存储数据 key 是 group.id+topic+
分区号 value 就是当前 offset 的值。每隔一段时间, kafka 内部会对这个 topic 进行
compact ,也就是每个 group.id+topic+分区号就保留最新数据

1.2 消费offset案例

1.首先在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false

默认是 true ,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
2. 采用命令行方式,创建一个新的 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --
replication-factor 2
3.启动生产者往 atguigu 生产数据
bin/kafka-console-producer.sh --topic  atguigu --bootstrap-server hadoop102:9092
4.消费数据
bin/kafka-console-consumer.sh  -- bootstrap-server hadoop102:9092 --topic atguigu --group test
注意:指定消费者组名称,更好观察数据存储位置( key group.id+topic+ 分区号)。
5. 查看消费者消费主题 __consumer_offsets

 二  offset的提交方式

2.1 自动提交方式

为了使我们能够专注于自己的业务逻辑, Kafka提供了自动提交offset的功能。
自动提交 offset 的相关参数:
enable.auto.commit 是否开启自动提交 offset 功能,默认是 true
auto.commit.interval.ms 自动提交 offset 的时间间隔,默认是 5s

 

 2.1.1 代码部分设置

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
// 提交 offset 的时间周期 1000ms ,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);

2.2 手动提交方式

虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset API
手动提交 offset 的方法有两种:分别是 commitSync (同步提交) commitAsync (异步提交) 。两者的相 同点是,都会将 本次提交的一批数据最高的偏移量提交 ;不同点是, 同步提交阻塞当前线程 ,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 异步提交则没有失败重试机制,故 有可能提交失败。
commitSync (同步提交):必须等待 offset提交完毕,再去消费下一批数据 。并且会自动失败重试
commitAsync (异步提交) :发送完提交 offset请求后,就开始消费下一批数据了。没有失败重试机制

2.2.1 同步提交

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交 offset 的示例

 

 2.3.2 异步提交

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。 因此更多的情况下,会选用异步提交 offset 的方式。

 

这篇关于kafka的offset存储位置以及offset的提交方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013065

相关文章

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

Vue3视频播放组件 vue3-video-play使用方式

《Vue3视频播放组件vue3-video-play使用方式》vue3-video-play是Vue3的视频播放组件,基于原生video标签开发,支持MP4和HLS流,提供全局/局部引入方式,可监听... 目录一、安装二、全局引入三、局部引入四、基本使用五、事件监听六、播放 HLS 流七、更多功能总结在 v

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

k8s admin用户生成token方式

《k8sadmin用户生成token方式》用户使用Kubernetes1.28创建admin命名空间并部署,通过ClusterRoleBinding为jenkins用户授权集群级权限,生成并获取其t... 目录k8s admin用户生成token创建一个admin的命名空间查看k8s namespace 的

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

k8s搭建nfs共享存储实践

《k8s搭建nfs共享存储实践》本文介绍NFS服务端搭建与客户端配置,涵盖安装工具、目录设置及服务启动,随后讲解K8S中NFS动态存储部署,包括创建命名空间、ServiceAccount、RBAC权限... 目录1. NFS搭建1.1 部署NFS服务端1.1.1 下载nfs-utils和rpcbind1.1

java读取excel文件为base64实现方式

《java读取excel文件为base64实现方式》文章介绍使用ApachePOI和EasyExcel处理Excel文件并转换为Base64的方法,强调EasyExcel适合大文件且内存占用低,需注意... 目录使用 Apache POI 读取 Excel 并转换为 Base64使用 EasyExcel 处