Kafka简单入门02——ISR机制

2023-10-26 13:15
文章标签 简单 入门 02 机制 kafka isr

本文主要是介绍Kafka简单入门02——ISR机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

ISR机制

ISR 关键概念

HW和LEO

Java使用Kafka通信

Kafka 生产者示例

Kafka 消费者示例


ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。

ISR 关键概念
  1. 领导者和追随者:每个分区有一个领导者和零个或多个追随者。领导者负责处理客户端的写请求,而追随者主要用于数据复制。

  2. ISR 集合:ISR 集合是分区领导者的一组追随者副本,它们与领导者保持数据同步。只有在 ISR 集合中的追随者副本可以参与数据的写入和读取操作。

  3. 数据复制:领导者将消息写入其本地日志,并定期将这些消息发送给 ISR 集合中的追随者。追随者接收消息后,将其写入本地日志,以保持数据同步。

  4. Leader Epoch 和 Log Start Offset:ISR 集合中的每个追随者都维护了领导者的日志信息,包括领导者的 Leader Epoch 和 Log Start Offset。这些信息用于确保数据的正确复制和同步。

  5. 数据一致性:只有在 ISR 集合中的所有追随者都成功复制了一条消息后,领导者才会将该消息标记为已提交,确保数据的一致性。

  6. 故障处理:如果某个追随者发生故障或者追赶进度过慢,那么该追随者可能会被从 ISR 集合中移除。这有助于保持数据的可靠性和避免影响性能。

其中,需要注意的的概念:

  • 分区中的所有副本统称为AR(Assigned Replicas)。

  • 所有Leader副本加上和Leader副本保持同步的Follower副本组成ISR(In-Sync Replicas)。

  • 所有没有保持同步的Follower副本组成OSR(Out-of-Sync Replicas)。

  • AR = ISR + OSR。正常情况下,所有Follower副本都应该和Leader副本一致,即AR=ISR。

  • 当Leader故障时,在ISR集合中的Follower才有资格被选举为新的Leader。

HW和LEO

在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消费有关的两个重要概念。

HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它表示已经提交的消息。只有在 HW 之前的消息才被认为是已经提交的,这些消息已经被写入分区的所有追随者副本,并且被认为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。

LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 表示分区日志中的最后一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而逐渐增加。

HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性:

  • HW 之前的消息是已经提交的消息,它们在数据复制中是安全的,不会丢失。

  • LEO 之前的消息是已经写入但尚未被所有追随者副本复制的消息。这些消息可能会在 HW 之前被提交,也可能会在之后被提交。

  • 一旦 HW 追赶上 LEO,表示所有的消息都已经提交,分区的数据一致性得到了保障。

Kafka的消息同步流程:

  1. 初始状态,HW和LEO在同一位置。消费者可以读取的有效消息为0,1,2,3.

  2. 消息写入Leader,LEO位置改变。Follower进行同步。

  3. Follower同步进度决定HW位置,消费者可读的有效消息0,1,2,3,4。

  4. 完成同步,消费者可读的有效消息0,1,2,3,4,5,6。

可以看出,Kafka的复制机制既不是完全的同步复制,也不是单纯异步复制。

  • 同步复制要求所有Follower副本都复制完,太影响性能了。

  • 异步复制只要数据被写入Leader副本就认为提交成功,在这种情况下,如果Leader宕机时候Follower还是落后于Leader就会造成数据丢失。

而Kafka使用的ISR机制则有效地权衡了数据可靠性和性能之间的关系。

Java使用Kafka通信

以下是 Kafka 生产者和消费者的简单示例,使用 Kafka 的 Java 客户端库(Kafka Producer 和 Kafka Consumer)来创建一个基本的消息传递示例。

Kafka 生产者示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
​
public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092"; // Kafka 服务器地址String topic = "my-topic"; // Kafka 主题名称
​Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​Producer<String, String> producer = new KafkaProducer<>(properties);
​// 发送消息producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {if (exception == null) {System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());} else {System.err.println("Error sending message: " + exception.getMessage());}});
​producer.close();}
}
Kafka 消费者示例
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;
​
public class KafkaConsumerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092"; // Kafka 服务器地址String groupId = "my-group"; // 消费者组 IDString topic = "my-topic"; // Kafka 主题名称
​Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("group.id", groupId);properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
​KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));
​while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: key = " + record.key() + ", value = " + record.value());}}}
}

这篇关于Kafka简单入门02——ISR机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/285630

相关文章

Python yield与yield from的简单使用方式

《Pythonyield与yieldfrom的简单使用方式》生成器通过yield定义,可在处理I/O时暂停执行并返回部分结果,待其他任务完成后继续,yieldfrom用于将一个生成器的值传递给另一... 目录python yield与yield from的使用代码结构总结Python yield与yield

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

Spring WebClient从入门到精通

《SpringWebClient从入门到精通》本文详解SpringWebClient非阻塞响应式特性及优势,涵盖核心API、实战应用与性能优化,对比RestTemplate,为微服务通信提供高效解决... 目录一、WebClient 概述1.1 为什么选择 WebClient?1.2 WebClient 与

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

Go语言并发之通知退出机制的实现

《Go语言并发之通知退出机制的实现》本文主要介绍了Go语言并发之通知退出机制的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、通知退出机制1.1 进程/main函数退出1.2 通过channel退出1.3 通过cont

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

Java中使用 @Builder 注解的简单示例

《Java中使用@Builder注解的简单示例》@Builder简化构建但存在复杂性,需配合其他注解,导致可变性、抽象类型处理难题,链式编程非最佳实践,适合长期对象,避免与@Data混用,改用@G... 目录一、案例二、不足之处大多数同学使用 @Builder 无非就是为了链式编程,然而 @Builder

Java中的xxl-job调度器线程池工作机制

《Java中的xxl-job调度器线程池工作机制》xxl-job通过快慢线程池分离短时与长时任务,动态降级超时任务至慢池,结合异步触发和资源隔离机制,提升高频调度的性能与稳定性,支撑高并发场景下的可靠... 目录⚙️ 一、调度器线程池的核心设计 二、线程池的工作流程 三、线程池配置参数与优化 四、总结:线程

Spring Boot 与微服务入门实战详细总结

《SpringBoot与微服务入门实战详细总结》本文讲解SpringBoot框架的核心特性如快速构建、自动配置、零XML与微服务架构的定义、演进及优缺点,涵盖开发环境准备和HelloWorld实战... 目录一、Spring Boot 核心概述二、微服务架构详解1. 微服务的定义与演进2. 微服务的优缺点三

从入门到精通详解LangChain加载HTML内容的全攻略

《从入门到精通详解LangChain加载HTML内容的全攻略》这篇文章主要为大家详细介绍了如何用LangChain优雅地处理HTML内容,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录引言:当大语言模型遇见html一、HTML加载器为什么需要专门的HTML加载器核心加载器对比表二