kafka的offset存储位置以及offset的提交方式

2024-05-29 07:48

本文主要是介绍kafka的offset存储位置以及offset的提交方式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 offset的存储位置

1.1 存储位置

1.从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的topic 中,该 topic __consumer_offsets
2. Kafka0.9 版本之前, consumer 默认将 offset 保存在 Zookeeper 中。

 

__consumer_offsets 主题里面采 用 key 和 value 的方式存储数据 key 是 group.id+topic+
分区号 value 就是当前 offset 的值。每隔一段时间, kafka 内部会对这个 topic 进行
compact ,也就是每个 group.id+topic+分区号就保留最新数据

1.2 消费offset案例

1.首先在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false

默认是 true ,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
2. 采用命令行方式,创建一个新的 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --
replication-factor 2
3.启动生产者往 atguigu 生产数据
bin/kafka-console-producer.sh --topic  atguigu --bootstrap-server hadoop102:9092
4.消费数据
bin/kafka-console-consumer.sh  -- bootstrap-server hadoop102:9092 --topic atguigu --group test
注意:指定消费者组名称,更好观察数据存储位置( key group.id+topic+ 分区号)。
5. 查看消费者消费主题 __consumer_offsets

 二  offset的提交方式

2.1 自动提交方式

为了使我们能够专注于自己的业务逻辑, Kafka提供了自动提交offset的功能。
自动提交 offset 的相关参数:
enable.auto.commit 是否开启自动提交 offset 功能,默认是 true
auto.commit.interval.ms 自动提交 offset 的时间间隔,默认是 5s

 

 2.1.1 代码部分设置

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
true);
// 提交 offset 的时间周期 1000ms ,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
1000);

2.2 手动提交方式

虽然自动提交 offset 十分简单便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因 此Kafka 还提供了手动提交 offset API
手动提交 offset 的方法有两种:分别是 commitSync (同步提交) commitAsync (异步提交) 。两者的相 同点是,都会将 本次提交的一批数据最高的偏移量提交 ;不同点是, 同步提交阻塞当前线程 ,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 异步提交则没有失败重试机制,故 有可能提交失败。
commitSync (同步提交):必须等待 offset提交完毕,再去消费下一批数据 。并且会自动失败重试
commitAsync (异步提交) :发送完提交 offset请求后,就开始消费下一批数据了。没有失败重试机制

2.2.1 同步提交

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提
交的效率比较低。以下为同步提交 offset 的示例

 

 2.3.2 异步提交

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此
吞吐量会受到很大的影响。 因此更多的情况下,会选用异步提交 offset 的方式。

 

这篇关于kafka的offset存储位置以及offset的提交方式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013065

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Oracle数据库定时备份脚本方式(Linux)

《Oracle数据库定时备份脚本方式(Linux)》文章介绍Oracle数据库自动备份方案,包含主机备份传输与备机解压导入流程,强调需提前全量删除原库数据避免报错,并需配置无密传输、定时任务及验证脚本... 目录说明主机脚本备机上自动导库脚本整个自动备份oracle数据库的过程(建议全程用root用户)总结

Debian系和Redhat系防火墙配置方式

《Debian系和Redhat系防火墙配置方式》文章对比了Debian系UFW和Redhat系Firewalld防火墙的安装、启用禁用、端口管理、规则查看及注意事项,强调SSH端口需开放、规则持久化,... 目录Debian系UFW防火墙1. 安装2. 启用与禁用3. 基本命令4. 注意事项5. 示例配置R

最新Spring Security的基于内存用户认证方式

《最新SpringSecurity的基于内存用户认证方式》本文讲解SpringSecurity内存认证配置,适用于开发、测试等场景,通过代码创建用户及权限管理,支持密码加密,虽简单但不持久化,生产环... 目录1. 前言2. 因何选择内存认证?3. 基础配置实战❶ 创建Spring Security配置文件

Python获取浏览器Cookies的四种方式小结

《Python获取浏览器Cookies的四种方式小结》在进行Web应用程序测试和开发时,获取浏览器Cookies是一项重要任务,本文我们介绍四种用Python获取浏览器Cookies的方式,具有一定的... 目录什么是 Cookie?1.使用Selenium库获取浏览器Cookies2.使用浏览器开发者工具

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

linux批量替换文件内容的实现方式

《linux批量替换文件内容的实现方式》本文总结了Linux中批量替换文件内容的几种方法,包括使用sed替换文件夹内所有文件、单个文件内容及逐行字符串,强调使用反引号和绝对路径,并分享个人经验供参考... 目录一、linux批量替换文件内容 二、替换文件内所有匹配的字符串 三、替换每一行中全部str1为st

Python实现终端清屏的几种方式详解

《Python实现终端清屏的几种方式详解》在使用Python进行终端交互式编程时,我们经常需要清空当前终端屏幕的内容,本文为大家整理了几种常见的实现方法,有需要的小伙伴可以参考下... 目录方法一:使用 `os` 模块调用系统命令方法二:使用 `subprocess` 模块执行命令方法三:打印多个换行符模拟

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,