【Kafka】Kafka提高生产者吞吐量、数据可靠性-06

2024-06-16 14:12

本文主要是介绍【Kafka】Kafka提高生产者吞吐量、数据可靠性-06,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Kafka】Kafka提高生产者吞吐量-06

  • 1. 提高生产者吞吐量
  • 2.数据可靠性
    • 2.1 回顾数据的发送流程
    • 2.2 ack应答级别
      • 2.2.1 acks:0
      • 2.2.2 acks:1
      • 2.2.2 acks:-1(all)
        • 2.2.2.1 数据可靠性分析
        • 2.2.2.2 数据完全可靠
    • 2.3 可靠性总结
    • 2.4 可靠性代码配置

1. 提高生产者吞吐量

在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerParameters {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// batch.size:批次大小,默认 16Kproperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// linger.ms:等待时间,默认 0properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);// RecordAccumulator:缓冲区大小,默认 32M:buffer.memoryproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first", "atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

测试:
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

2.数据可靠性

2.1 回顾数据的发送流程

在这里插入图片描述

2.2 ack应答级别

在这里插入图片描述

2.2.1 acks:0

在这里插入图片描述
当应答级别为0时,生产者发送过来的数据,不需要等数据落盘应答。
工作:生产者将数据发送到leader中,如果leader突然挂掉了,leader还没有与follower同步,那么整个数据就全部都丢了。

2.2.2 acks:1

在这里插入图片描述
当应答级别为1时,生产者发送过来的数据,Leader收到数据后应答。

工作:如果应答完毕之后,leader还未与follower同步,leader挂了,新的leader会产生,原来的一条数据不会再次发送,造成了数据的丢失。

2.2.2 acks:-1(all)

在这里插入图片描述
当应答级别为-1时,生产者发送过来的数据,Leader+和isr队列
里面的所有节点收齐数据后应答。-1和all等价。

工作:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持问步的Folower + Leader集合(leader:0, isr:0,1,2)

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。

这样就不用等长期联系不上或者己经故障的节点。

2.2.2.1 数据可靠性分析

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)


2.2.2.2 数据完全可靠

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.3 可靠性总结

可靠性总结:

  1. acks=0,生产者发送过来数据就不管了,可靠性差,效率高

  2. acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等

  3. acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低

  4. 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

2.4 可靠性代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerAck {public static void main(String[] args) throwsInterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 4. 调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));}// 5. 关闭资源kafkaProducer.close();}
}

这篇关于【Kafka】Kafka提高生产者吞吐量、数据可靠性-06的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1066656

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语