Kafka 新的消费组默认的偏移量设置和消费行为

2024-06-19 18:36

本文主要是介绍Kafka 新的消费组默认的偏移量设置和消费行为,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

个人名片
在这里插入图片描述
🎓作者简介:java领域优质创作者
🌐个人主页:码农阿豪
📞工作室:新空间代码工作室(提供各种软件服务)
💌个人邮箱:[2435024119@qq.com]
📱个人微信:15279484656
🌐个人导航网站:www.forff.top
💡座右铭:总有人要赢。为什么不能是我呢?

  • 专栏导航:

码农阿豪系列专栏导航
面试专栏:收集了java相关高频面试题,面试实战总结🍻🎉🖥️
Spring5系列专栏:整理了Spring5重要知识点与实战演练,有案例可直接使用🚀🔧💻
Redis专栏:Redis从零到一学习分享,经验总结,案例实战💐📝💡
全栈系列专栏:海纳百川有容乃大,可能你想要的东西里面都有🤸🌱🚀

Kafka 新的消费组默认的偏移量设置和消费行为由 auto-offset-reset 配置项决定。以下是详细说明:

目录

      • 默认消费行为
      • 是否需要设置偏移量
      • 不设置偏移量是否会重复消费
        • 1. 新的消费者组
        • 2. 现有的消费者组
        • 3. 配置 `enable-auto-commit`
        • 避免重复消费的建议
        • 例外情况
      • 小结

默认消费行为

当一个新的消费者组第一次订阅一个主题时,它会根据 auto-offset-reset 的配置来决定从哪里开始消费消息。auto-offset-reset 有三个选项:

  1. earliest:如果消费者组没有已提交的偏移量(即新的消费者组),则从主题的最早消息开始消费。
  2. latest:如果消费者组没有已提交的偏移量,则从最新的消息开始消费(即从消费者启动之后生成的消息)。
  3. none:如果消费者组没有已提交的偏移量,则抛出异常。

例如,默认配置可以是:

kafka:bootstrap-servers: 10.206.*.*:9092,10.206.*.*:9092,10.206.*.*:9092consumer:value-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: new-consumer-group  # 新的消费者组IDauto-offset-reset: earliest  # 从最早的消息开始消费enable-auto-commit: truekey-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:partition:assignment:strategy: org.apache.kafka.clients.consumer.RoundRobinAssignorfetch-min-size: 100000

是否需要设置偏移量

  • 默认情况下:如果你使用 auto-offset-reset: earliestauto-offset-reset: latest,并且 enable-auto-commit: true,新的消费者组会自动从最早或最新的偏移量开始消费,不需要手动设置偏移量。

  • 手动设置偏移量:如果你有特定的需求,需要从某个特定的位置(比如某个标签消息)开始消费,则需要手动设置偏移量。手动设置偏移量的步骤如下:

    1. 禁用自动提交偏移量:设置 enable-auto-commit: false

    2. 在代码中手动查找并设置偏移量

      例如,在 Java 中:

      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
      consumer.subscribe(Collections.singletonList("your-topic"));// 查找特定偏移量
      while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {if (record.value().contains("your-tag")) {consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());break;}}break;
      }// 从设定的偏移量开始消费
      while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync();
      }
      

不设置偏移量是否会重复消费

是否会重复消费取决于消费者组的配置和消息处理的具体场景。以下是几种可能的情况及其影响:

1. 新的消费者组
  • 第一次消费:如果一个新的消费者组第一次订阅一个主题,Kafka 会根据 auto-offset-reset 配置决定从哪里开始消费:

    • earliest:从最早的消息开始消费。
    • latest:从最新的消息开始消费(即从消费者启动之后生成的消息)。
    • none:如果没有已提交的偏移量,则抛出异常。

    在这种情况下,不会出现重复消费的情况,因为没有先前的消费记录。

2. 现有的消费者组
  • 已有偏移量:如果消费者组已经有已提交的偏移量,Kafka 将从最后提交的偏移量继续消费,不会出现重复消费。
  • 未提交偏移量:如果消费者实例崩溃且未能提交偏移量,重启后可能会从上次提交的偏移量开始重新消费,从而导致部分消息被重复消费。
3. 配置 enable-auto-commit
  • 启用自动提交(enable-auto-commit: true:偏移量会自动提交,通常不会重复消费消息,除非在自动提交间隔内发生消费者崩溃。
  • 禁用自动提交(enable-auto-commit: false:需要手动提交偏移量,如果在消费完成后未能及时提交偏移量,可能会导致重启后从最后提交的偏移量开始重复消费。
避免重复消费的建议
  1. 定期提交偏移量:确保在消费完成后及时提交偏移量。可以使用 consumer.commitSync()consumer.commitAsync() 方法。

    while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}// 同步提交偏移量consumer.commitSync();
    }
    
  2. 使用幂等性操作:确保消费者对消息的处理是幂等的,即多次处理同一条消息不会产生副作用。这样即使发生重复消费,也不会影响系统的正确性。

  3. 监控和日志记录:在日志中记录偏移量信息,便于在出现问题时进行调试和修复。

  4. 适当的自动提交间隔:如果启用了自动提交,设置合适的自动提交间隔(auto-commit-interval),确保偏移量能及时提交。

例外情况

在某些高可用或低延迟要求的场景下,可以考虑启用 Kafka 的事务性生产者和消费者,以确保消息消费和处理的准确性和一致性。

总结来说,不设置偏移量本身并不会直接导致重复消费,但需要确保合理的偏移量提交机制和幂等性操作来避免可能的重复消费问题。

小结

  • 默认情况下:新的消费者组根据 auto-offset-reset 配置自动决定从哪里开始消费,不需要手动设置偏移量。
  • 特殊需求:如果需要从特定的消息位置开始消费,则需要手动管理偏移量,包括禁用自动提交和手动设置偏移量。

根据你的需求,配置和管理消费者组的偏移量以确保消息的正确消费。

这篇关于Kafka 新的消费组默认的偏移量设置和消费行为的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1075818

相关文章

python设置环境变量路径实现过程

《python设置环境变量路径实现过程》本文介绍设置Python路径的多种方法:临时设置(Windows用`set`,Linux/macOS用`export`)、永久设置(系统属性或shell配置文件... 目录设置python路径的方法临时设置环境变量(适用于当前会话)永久设置环境变量(Windows系统

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

Go语言编译环境设置教程

《Go语言编译环境设置教程》Go语言支持高并发(goroutine)、自动垃圾回收,编译为跨平台二进制文件,云原生兼容且社区活跃,开发便捷,内置测试与vet工具辅助检测错误,依赖模块化管理,提升开发效... 目录Go语言优势下载 Go  配置编译环境配置 GOPROXYIDE 设置(VS Code)一些基本

小白也能轻松上手! 路由器设置优化指南

《小白也能轻松上手!路由器设置优化指南》在日常生活中,我们常常会遇到WiFi网速慢的问题,这主要受到三个方面的影响,首要原因是WiFi产品的配置优化不合理,其次是硬件性能的不足,以及宽带线路本身的质... 在数字化时代,网络已成为生活必需品,追剧、游戏、办公、学习都离不开稳定高速的网络。但很多人面对新路由器

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

linux hostname设置全过程

《linuxhostname设置全过程》:本文主要介绍linuxhostname设置全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录查询hostname设置步骤其它相关点hostid/etc/hostsEDChina编程A工具license破解注意事项总结以RHE

Python设置Cookie永不超时的详细指南

《Python设置Cookie永不超时的详细指南》Cookie是一种存储在用户浏览器中的小型数据片段,用于记录用户的登录状态、偏好设置等信息,下面小编就来和大家详细讲讲Python如何设置Cookie... 目录一、Cookie的作用与重要性二、Cookie过期的原因三、实现Cookie永不超时的方法(一)

Qt 设置软件版本信息的实现

《Qt设置软件版本信息的实现》本文介绍了Qt项目中设置版本信息的三种常用方法,包括.pro文件和version.rc配置、CMakeLists.txt与version.h.in结合,具有一定的参考... 目录在运行程序期间设置版本信息可以参考VS在 QT 中设置软件版本信息的几种方法方法一:通过 .pro

nginx启动命令和默认配置文件的使用

《nginx启动命令和默认配置文件的使用》:本文主要介绍nginx启动命令和默认配置文件的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录常见命令nginx.conf配置文件location匹配规则图片服务器总结常见命令# 默认配置文件启动./nginx