Kafka的Offset(偏移量)详解

2024-08-25 16:52
文章标签 详解 kafka 偏移量 offset

本文主要是介绍Kafka的Offset(偏移量)详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Kafka的Offset详解

  • 1、生产者Offset
  • 2、消费者Offset
    • 2.1、消费者
    • 2.2、生产者
    • 2.3、实体类对象
    • 2.4、JSON工具类
    • 2.5、项目配置文件
    • 2.6、测试类
    • 2.7、测试
    • 2.8、总结

1、生产者Offset

在这里插入图片描述
在这里插入图片描述

2、消费者Offset

在这里插入图片描述

2.1、消费者

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {/*** topics 用于指定从哪个主题中消费消息* concurrency 用于指定有多少个消费者* @param record*/@KafkaListener(topics = {"offSetTopic"}, groupId = "offSetGroup")public void onEventA(ConsumerRecord<String, String> record) {System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);}
}

2.2、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent(){for (int i = 0; i < 2; i++) {User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("offSetTopic","k"+i, userJson);}}}

2.3、实体类对象

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

2.4、JSON工具类

package com.power.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;public class JSONUtils {private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();public static String toJSON(Object object){try {return OBJECTMAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json,Class<T> clazz){try {return OBJECTMAPPER.readValue(json,clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}
}

2.5、项目配置文件

spring:application:#应用名称name: spring-boot-06-kafka-offset#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置消费者的反序列化consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.6、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot07KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

2.7、测试

  • 先启动生产者,会发送两条消息到kafka服务器

  • 再启动消费者监听,此时我们发现,启动后的消费者并不会监听到生产者已发送的两条消息

  • 在kafka安装目录的bin文件夹下执行命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe
  • 根据命令结果:查看kafka消费者的偏移量offset,我们发现当前消费者偏移量CURRENT-OFFSET值为2 ,当前日志记录的生产者消息偏移量LOG-END-OFFSET值为2,消费者偏移量和日志记录的生产者消息偏移量差值LAG值为0 ,所以消费者查询不到生产者发送的消息。

在这里插入图片描述

  • 关闭消费者,再次使用生产者发送消息,再次执行命令查看消费者偏移量

在这里插入图片描述

  • 此时我们发现消费者偏移量为4,日志记录的偏移量为6,两者差值为2,此时启动消费者,读取到了差值为2的数据

2.8、总结

在这里插入图片描述

  • 消费者从什么地方开始消费,就看消费者的offset是多少,消费者启动后他的offset是多少。
  • 消费者offset是多少,可以通过命令查看
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe

这篇关于Kafka的Offset(偏移量)详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106111

相关文章

Python标准库之数据压缩和存档的应用详解

《Python标准库之数据压缩和存档的应用详解》在数据处理与存储领域,压缩和存档是提升效率的关键技术,Python标准库提供了一套完整的工具链,下面小编就来和大家简单介绍一下吧... 目录一、核心模块架构与设计哲学二、关键模块深度解析1.tarfile:专业级归档工具2.zipfile:跨平台归档首选3.

idea的终端(Terminal)cmd的命令换成linux的命令详解

《idea的终端(Terminal)cmd的命令换成linux的命令详解》本文介绍IDEA配置Git的步骤:安装Git、修改终端设置并重启IDEA,强调顺序,作为个人经验分享,希望提供参考并支持脚本之... 目录一编程、设置前二、前置条件三、android设置四、设置后总结一、php设置前二、前置条件

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

python使用try函数详解

《python使用try函数详解》Pythontry语句用于异常处理,支持捕获特定/多种异常、else/final子句确保资源释放,结合with语句自动清理,可自定义异常及嵌套结构,灵活应对错误场景... 目录try 函数的基本语法捕获特定异常捕获多个异常使用 else 子句使用 finally 子句捕获所

C++11范围for初始化列表auto decltype详解

《C++11范围for初始化列表autodecltype详解》C++11引入auto类型推导、decltype类型推断、统一列表初始化、范围for循环及智能指针,提升代码简洁性、类型安全与资源管理效... 目录C++11新特性1. 自动类型推导auto1.1 基本语法2. decltype3. 列表初始化3

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

Java Thread中join方法使用举例详解

《JavaThread中join方法使用举例详解》JavaThread中join()方法主要是让调用改方法的thread完成run方法里面的东西后,在执行join()方法后面的代码,这篇文章主要介绍... 目录前言1.join()方法的定义和作用2.join()方法的三个重载版本3.join()方法的工作原

Spring AI使用tool Calling和MCP的示例详解

《SpringAI使用toolCalling和MCP的示例详解》SpringAI1.0.0.M6引入ToolCalling与MCP协议,提升AI与工具交互的扩展性与标准化,支持信息检索、行动执行等... 目录深入探索 Spring AI聊天接口示例Function CallingMCPSTDIOSSE结束语

C语言进阶(预处理命令详解)

《C语言进阶(预处理命令详解)》文章讲解了宏定义规范、头文件包含方式及条件编译应用,强调带参宏需加括号避免计算错误,头文件应声明函数原型以便主函数调用,条件编译通过宏定义控制代码编译,适用于测试与模块... 目录1.宏定义1.1不带参宏1.2带参宏2.头文件的包含2.1头文件中的内容2.2工程结构3.条件编