kafka-producer异地性能损耗

2024-01-01 10:48

本文主要是介绍kafka-producer异地性能损耗,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

以下内容摘自雪球,在公司内部的docs上的内容总结,部分隐私信息已经处理改动


https://xueqiu.com/

 

背景:

在进行服务上云的时候发生了性能损耗问题,一步步从网络带宽问题、JDK版本问题、公网时延问题、CPU和内存问题走了很多弯路,最后才定位到kafka-producer,当然这也是由于业务排查过程中对于机房之间时延的几毫秒不重视造成

问题:

对服务本地机房和阿里云压测时,压测结果如下

本地机房

阿里云

 

 

TPS:150K

TPS:3K

从可以看到的问题就是阿里云的TPS比本地的机器低好几倍,

解决:

JDK版本统一,外网带宽绝对大于服务历史峰值,公网时延检测,CPU进行了4核8核的比对(不是性能的瓶颈,因为相同线程数和CPU的使用率都没升上去),内存进行了8GB和16GB对比(因为担心对外内存,合着堆外内存也就占了几MB,也没有FullGC)

以上一大通花费了大量时间之后,业务代码里面有一个推送状态回传的操作,需要将消息发送至kafka,之前一直监控了kafka-consumer(consumer是批量拉取的,而且频率不高所以各项指标都很正常)。但是把kafka-producer的监控指标给忽略了,通过方法耗时统计,找到了性能损耗发生在kafka-producer状态回传,以下内容主要是深入的解析kafka-producer的运行原理并评估在双机房下对性能的影响

1.一条消息发送的过程:send阶段→batching阶段→await-send阶段→inflight阶段→retry阶段

max.block.ms:控制KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞时间,如果消息速度大于producer交付到server端的阻塞时间, 将会抛出异常

batch.size:默认16Kb,太小降低吞吐率

linger.ms:默认0ms没有延迟,正常情况下想要减小请求的数量,合理设置类似TCP中的Nagle算法,当然batch.size优先

2.服务压测下性能比对

(注意到这一步,已经定位到时机房间的时延问题,主要对比时延的影响,以及如何优化)

batch-size
linger-ms

request-count

阿里云/延迟(ms)

星光/延迟(ms)

默认值(16K)

默认值(0ms)100327231
  10003516779
  10000371027474
32K0ms100515248
  10003934914
  10000407197526
64K0ms100380118
  10003577695
  10000377536665
64K5ms100468132
  10004014654
  10000384576524
64K10ms100388199
  100039671018
  10000396716338
160K100ms100461184
  100041871032
  10000402357253

不要盲目的调大这俩参数,可以看到当batch-size增大对producer有一定的性能提升,但是linger-ms对性能的提升不符合理论依据(本次实验的数据不一定能说明问题)

3.问:但是producer是异步的,怎么调大了batch-size作用还是不大?

答:原因是producer的Record在进入Accumulator之前,首先会先从bootstrap servers获取最新的topic-partition信息,这个过程会阻塞生产线程,直到MetadataRequest完成。所以每一个metadata消耗一个延迟,那么随着消息数量的递增,延时将会被无限放大(这里就在想,怎么来控制metadata的有效期,不要每次都从server端获取就好了)

KafkaProducer.ClusterAndWaitTime waitOnMetadata方法 展开源码

private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {this.metadata.add(topic);Cluster cluster = this.metadata.fetch();Integer partitionsCount = cluster.partitionCountForTopic(topic);if (partitionsCount == null || partition != null && partition >= partitionsCount) {long begin = this.time.milliseconds();long remainingWaitMs = maxWaitMs;long elapsed;do {this.log.trace("Requesting metadata update for topic {}.", topic);this.metadata.add(topic);int version = this.metadata.requestUpdate();this.sender.wakeup();try {this.metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException var15) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}cluster = this.metadata.fetch();elapsed = this.time.milliseconds() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}if (cluster.unauthorizedTopics().contains(topic)) {throw new TopicAuthorizationException(topic);}remainingWaitMs = maxWaitMs - elapsed;partitionsCount = cluster.partitionCountForTopic(topic);} while(partitionsCount == null);if (partition != null && partition >= partitionsCount) {throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));} else {return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);}} else {return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);}}

metadata.max.age.ms:就是这个参数,控制着metadata的有效时间,把它调大就好了 (错误,这个意思理解错了)

在一个函数中有这么一个调用关系:

1.把needUpdate置为true
2.唤起sender
3.阻塞awaitUpdate

也就是说当Sender成功更新meatadata之后,version加1。否则会wait个maxWaitMs时间,欲哭无泪丧尽天良,每次都要强制从server端获取过metadata之后才允许往下一步进行。。。。

Metadata的awaitUpdate方法毁灭了我的幻想 展开源码

public synchronized void awaitUpdate(int lastVersion, long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0L) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");} else {long begin = System.currentTimeMillis();long elapsed;for(long remainingWaitMs = maxWaitMs; this.version <= lastVersion; remainingWaitMs = maxWaitMs - elapsed) {AuthenticationException ex = this.getAndClearAuthenticationException();if (ex != null) {throw ex;}if (remainingWaitMs != 0L) {this.wait(remainingWaitMs);}elapsed = System.currentTimeMillis() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}}}}

结论:

只要时延存在,没有银弹

只不过会在低请求时不会暴露问题,而随着请求数的增长,这个时延问题会一直被放大(xueqiu-push项目中50个以下就看不出来)

目前对于这个问题的解决路径是调大了metadata的expired-time,让producer在异步send的时候不在waitOnMetadata方法阻塞太长时间(错误,这个意思理解错了,请看上面解释)

所以要么在外面再添加一层异步调用,要么把kafka的server给换成本地的,网络延时kafka-client-1.X版本下目前还是会阻塞业务的

that's all!注意kafka的所有参数都有用,辛亏把matric监控指标打的全!!!!!欧耶!!

这篇关于kafka-producer异地性能损耗的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/559025

相关文章

从原理到实战解析Java Stream 的并行流性能优化

《从原理到实战解析JavaStream的并行流性能优化》本文给大家介绍JavaStream的并行流性能优化:从原理到实战的全攻略,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的... 目录一、并行流的核心原理与适用场景二、性能优化的核心策略1. 合理设置并行度:打破默认阈值2. 避免装箱

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

深度剖析SpringBoot日志性能提升的原因与解决

《深度剖析SpringBoot日志性能提升的原因与解决》日志记录本该是辅助工具,却为何成了性能瓶颈,SpringBoot如何用代码彻底破解日志导致的高延迟问题,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言第一章:日志性能陷阱的底层原理1.1 日志级别的“双刃剑”效应1.2 同步日志的“吞吐量杀手”

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Java慢查询排查与性能调优完整实战指南

《Java慢查询排查与性能调优完整实战指南》Java调优是一个广泛的话题,它涵盖了代码优化、内存管理、并发处理等多个方面,:本文主要介绍Java慢查询排查与性能调优的相关资料,文中通过代码介绍的非... 目录1. 事故全景:从告警到定位1.1 事故时间线1.2 关键指标异常1.3 排查工具链2. 深度剖析:

深入解析Java NIO在高并发场景下的性能优化实践指南

《深入解析JavaNIO在高并发场景下的性能优化实践指南》随着互联网业务不断演进,对高并发、低延时网络服务的需求日益增长,本文将深入解析JavaNIO在高并发场景下的性能优化方法,希望对大家有所帮助... 目录简介一、技术背景与应用场景二、核心原理深入分析2.1 Selector多路复用2.2 Buffer

基于Python Playwright进行前端性能测试的脚本实现

《基于PythonPlaywright进行前端性能测试的脚本实现》在当今Web应用开发中,性能优化是提升用户体验的关键因素之一,本文将介绍如何使用Playwright构建一个自动化性能测试工具,希望... 目录引言工具概述整体架构核心实现解析1. 浏览器初始化2. 性能数据收集3. 资源分析4. 关键性能指

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

MySQL深分页进行性能优化的常见方法

《MySQL深分页进行性能优化的常见方法》在Web应用中,分页查询是数据库操作中的常见需求,然而,在面对大型数据集时,深分页(deeppagination)却成为了性能优化的一个挑战,在本文中,我们将... 目录引言:深分页,真的只是“翻页慢”那么简单吗?一、背景介绍二、深分页的性能问题三、业务场景分析四、

MySQL 多列 IN 查询之语法、性能与实战技巧(最新整理)

《MySQL多列IN查询之语法、性能与实战技巧(最新整理)》本文详解MySQL多列IN查询,对比传统OR写法,强调其简洁高效,适合批量匹配复合键,通过联合索引、分批次优化提升性能,兼容多种数据库... 目录一、基础语法:多列 IN 的两种写法1. 直接值列表2. 子查询二、对比传统 OR 的写法三、性能分析