kafka: 基础概念回顾(生产者客户端和机架感知相关内容)

本文主要是介绍kafka: 基础概念回顾(生产者客户端和机架感知相关内容),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、kafka生产者客户端

1、整体架构:数据发送流程

在这里插入图片描述
(1)生产者

  • 拦截器
    生产者的拦截器可以在消息发送前做一些拦截工作对数据进行相应的处理,比如:消息过滤、消息内容修改等。
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {//在将消息序列化和计算分区之前会调⽤该⽅法,⽤来对消息进⾏相应的定制化操作,如修改消息内容public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//在消息被应答之前或者消息发送失败时调⽤该⽅法,优先于⽤⼾设定的Callback之前执⾏,如统计消息发送成功或失败的次数public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();
}
  • 序列化器
  • 分区器

二、kafka数据可靠性保证

1、LEO和HW
2、工作流程
3、Leader Epoch

三、粘性分区策略

四、机架感知

1、概念
2、机架感知分区分配策略
3、验证

(1)验证目标

  • 机架感知特性将同⼀分区的副本分散到不同的机架上
  • rack机制消费者可以消费到follower副本中的数据

(2)参数配置
broker端配置:

  • 配置名:broker.rack=my-rack-id
    • 解释:broker属于的rack
  • 配置名:replica.selector.class
    • 解释:ReplicaSelector实现类的全名,包括路径 (⽐如 RackAwareReplicaSelector 即按 rack id 指定消费)

Client端配置:
client.rack

  • consumer端配置
  • 配置名:client.rack
  • 解释:这个参数需要和broker端指定的 broker.rack 相同,表⽰去哪个rack中获取数据。
  • 默认:null

(3)环境准备:kafka集群

  • kafka实例数: 4
  • 两个kafka实例broker.rack配置为0,另外两个kafka实例broker.rack配置为了2,broker端配置如下:
server1:
broker.id=0broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver2:
broker.id=1
broker.rack=0
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver3
broker.id=2
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelectorserver4
broker.id=3
broker.rack=2
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector

启动kafka集群,服务端⽇志信息:
在这里插入图片描述
在这里插入图片描述
验证一:机架感知特性将同一分区的副本分散到不同的机架上
在这里插入图片描述
创建topic rack02,副本被分配到了broker1和2
在这里插入图片描述
创建topic rack03 副本被分配到了0和3
在这里插入图片描述
在这里插入图片描述

验证二:客⼾端(消费者)验证:rack机制消费者可以消费到follower副本中的数据

验证代码如下:

package person.xsc.train.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import person.xsc.train.client.KafkaConsumerClient;
import person.xsc.train.constant.KafkaConstant;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Demo {public static KafkaConsumer<String, String> kafkaConsumer;public static void main(String[] args) {Properties properties = new Properties();properties.put(KafkaConstant.BOOTSTRAP_SERVERS, "localhost:9093,localhosproperties.put(KafkaConstant.GROUP_ID, "test01");properties.put(KafkaConstant.ENABLE_AUTO_COMMIT, "true");properties.put(KafkaConstant.AUTO_COMMIT_INTERVAL_MS, "1000");properties.put(KafkaConstant.KEY_DESERIALIZER, StringDeserializer.class.properties.put(KafkaConstant.VALUE_DESERIALIZER, StringDeserializer.clasproperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");properties.put(ConsumerConfig.CLIENT_RACK_CONFIG, "0");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");kafkaConsumer = KafkaConsumerClient.createKafkaClient(properties);receiveMessage("rack02");}public static void receiveMessage(String topic) {TopicPartition topicPartition0 = new TopicPartition(topic, 0);kafkaConsumer.assign(Arrays.asList(topicPartition0));while(true) {// Kafka的消费者⼀次拉取⼀批的数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll//System.out.println("开始打印消息!");// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {// 主题String topicName = consumerRecord.topic();int partition = consumerRecord.partition();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();// key\valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println(String.format("topic: %s, partition: %s, offs}}}
}

前置背景:
Topic rack02的partition 0分区的副本为broker2(对应的rack为2)和broker1(对应的rack为0),其中broker2为leader(在⾮rack机制下仅能消费到leader中的数据)。

在上述代码中,消费者配置中限制了rack为0,消费的分区为0,因此映射到broker1。通过测试可验证在rack机制下消费者可以消费到folloer副本中的数据,测试如下:
在这里插入图片描述

五、机架感知存在的问题

这篇关于kafka: 基础概念回顾(生产者客户端和机架感知相关内容)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/589875

相关文章

CSS3中的字体及相关属性详解

《CSS3中的字体及相关属性详解》:本文主要介绍了CSS3中的字体及相关属性,详细内容请阅读本文,希望能对你有所帮助... 字体网页字体的三个来源:用户机器上安装的字体,放心使用。保存在第三方网站上的字体,例如Typekit和Google,可以link标签链接到你的页面上。保存在你自己Web服务器上的字

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

MYSQL查询结果实现发送给客户端

《MYSQL查询结果实现发送给客户端》:本文主要介绍MYSQL查询结果实现发送给客户端方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql取数据和发数据的流程(边读边发)Sending to clientSending DataLRU(Least Rec

安装centos8设置基础软件仓库时出错的解决方案

《安装centos8设置基础软件仓库时出错的解决方案》:本文主要介绍安装centos8设置基础软件仓库时出错的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录安装Centos8设置基础软件仓库时出错版本 8版本 8.2.200android4版本 javas

Python FastMCP构建MCP服务端与客户端的详细步骤

《PythonFastMCP构建MCP服务端与客户端的详细步骤》MCP(Multi-ClientProtocol)是一种用于构建可扩展服务的通信协议框架,本文将使用FastMCP搭建一个支持St... 目录简介环境准备服务端实现(server.py)客户端实现(client.py)运行效果扩展方向常见问题结

Python实现自动化Word文档样式复制与内容生成

《Python实现自动化Word文档样式复制与内容生成》在办公自动化领域,高效处理Word文档的样式和内容复制是一个常见需求,本文将展示如何利用Python的python-docx库实现... 目录一、为什么需要自动化 Word 文档处理二、核心功能实现:样式与表格的深度复制1. 表格复制(含样式与内容)2

Linux基础命令@grep、wc、管道符的使用详解

《Linux基础命令@grep、wc、管道符的使用详解》:本文主要介绍Linux基础命令@grep、wc、管道符的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录grep概念语法作用演示一演示二演示三,带选项 -nwc概念语法作用wc,不带选项-c,统计字节数-

MySQL 事务的概念及ACID属性和使用详解

《MySQL事务的概念及ACID属性和使用详解》MySQL通过多线程实现存储工作,因此在并发访问场景中,事务确保了数据操作的一致性和可靠性,下面通过本文给大家介绍MySQL事务的概念及ACID属性和... 目录一、什么是事务二、事务的属性及使用2.1 事务的 ACID 属性2.2 为什么存在事务2.3 事务

python操作redis基础

《python操作redis基础》Redis(RemoteDictionaryServer)是一个开源的、基于内存的键值对(Key-Value)存储系统,它通常用作数据库、缓存和消息代理,这篇文章... 目录1. Redis 简介2. 前提条件3. 安装 python Redis 客户端库4. 连接到 Re

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht