kafka-producer异地性能损耗

2024-01-01 10:48

本文主要是介绍kafka-producer异地性能损耗,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

以下内容摘自雪球,在公司内部的docs上的内容总结,部分隐私信息已经处理改动


https://xueqiu.com/

 

背景:

在进行服务上云的时候发生了性能损耗问题,一步步从网络带宽问题、JDK版本问题、公网时延问题、CPU和内存问题走了很多弯路,最后才定位到kafka-producer,当然这也是由于业务排查过程中对于机房之间时延的几毫秒不重视造成

问题:

对服务本地机房和阿里云压测时,压测结果如下

本地机房

阿里云

 

 

TPS:150K

TPS:3K

从可以看到的问题就是阿里云的TPS比本地的机器低好几倍,

解决:

JDK版本统一,外网带宽绝对大于服务历史峰值,公网时延检测,CPU进行了4核8核的比对(不是性能的瓶颈,因为相同线程数和CPU的使用率都没升上去),内存进行了8GB和16GB对比(因为担心对外内存,合着堆外内存也就占了几MB,也没有FullGC)

以上一大通花费了大量时间之后,业务代码里面有一个推送状态回传的操作,需要将消息发送至kafka,之前一直监控了kafka-consumer(consumer是批量拉取的,而且频率不高所以各项指标都很正常)。但是把kafka-producer的监控指标给忽略了,通过方法耗时统计,找到了性能损耗发生在kafka-producer状态回传,以下内容主要是深入的解析kafka-producer的运行原理并评估在双机房下对性能的影响

1.一条消息发送的过程:send阶段→batching阶段→await-send阶段→inflight阶段→retry阶段

max.block.ms:控制KafkaProducer.send()和KafkaProducer.partitionsFor()的阻塞时间,如果消息速度大于producer交付到server端的阻塞时间, 将会抛出异常

batch.size:默认16Kb,太小降低吞吐率

linger.ms:默认0ms没有延迟,正常情况下想要减小请求的数量,合理设置类似TCP中的Nagle算法,当然batch.size优先

2.服务压测下性能比对

(注意到这一步,已经定位到时机房间的时延问题,主要对比时延的影响,以及如何优化)

batch-size
linger-ms

request-count

阿里云/延迟(ms)

星光/延迟(ms)

默认值(16K)

默认值(0ms)100327231
  10003516779
  10000371027474
32K0ms100515248
  10003934914
  10000407197526
64K0ms100380118
  10003577695
  10000377536665
64K5ms100468132
  10004014654
  10000384576524
64K10ms100388199
  100039671018
  10000396716338
160K100ms100461184
  100041871032
  10000402357253

不要盲目的调大这俩参数,可以看到当batch-size增大对producer有一定的性能提升,但是linger-ms对性能的提升不符合理论依据(本次实验的数据不一定能说明问题)

3.问:但是producer是异步的,怎么调大了batch-size作用还是不大?

答:原因是producer的Record在进入Accumulator之前,首先会先从bootstrap servers获取最新的topic-partition信息,这个过程会阻塞生产线程,直到MetadataRequest完成。所以每一个metadata消耗一个延迟,那么随着消息数量的递增,延时将会被无限放大(这里就在想,怎么来控制metadata的有效期,不要每次都从server端获取就好了)

KafkaProducer.ClusterAndWaitTime waitOnMetadata方法 展开源码

private KafkaProducer.ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {this.metadata.add(topic);Cluster cluster = this.metadata.fetch();Integer partitionsCount = cluster.partitionCountForTopic(topic);if (partitionsCount == null || partition != null && partition >= partitionsCount) {long begin = this.time.milliseconds();long remainingWaitMs = maxWaitMs;long elapsed;do {this.log.trace("Requesting metadata update for topic {}.", topic);this.metadata.add(topic);int version = this.metadata.requestUpdate();this.sender.wakeup();try {this.metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException var15) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}cluster = this.metadata.fetch();elapsed = this.time.milliseconds() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}if (cluster.unauthorizedTopics().contains(topic)) {throw new TopicAuthorizationException(topic);}remainingWaitMs = maxWaitMs - elapsed;partitionsCount = cluster.partitionCountForTopic(topic);} while(partitionsCount == null);if (partition != null && partition >= partitionsCount) {throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));} else {return new KafkaProducer.ClusterAndWaitTime(cluster, elapsed);}} else {return new KafkaProducer.ClusterAndWaitTime(cluster, 0L);}}

metadata.max.age.ms:就是这个参数,控制着metadata的有效时间,把它调大就好了 (错误,这个意思理解错了)

在一个函数中有这么一个调用关系:

1.把needUpdate置为true
2.唤起sender
3.阻塞awaitUpdate

也就是说当Sender成功更新meatadata之后,version加1。否则会wait个maxWaitMs时间,欲哭无泪丧尽天良,每次都要强制从server端获取过metadata之后才允许往下一步进行。。。。

Metadata的awaitUpdate方法毁灭了我的幻想 展开源码

public synchronized void awaitUpdate(int lastVersion, long maxWaitMs) throws InterruptedException {if (maxWaitMs < 0L) {throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milliseconds");} else {long begin = System.currentTimeMillis();long elapsed;for(long remainingWaitMs = maxWaitMs; this.version <= lastVersion; remainingWaitMs = maxWaitMs - elapsed) {AuthenticationException ex = this.getAndClearAuthenticationException();if (ex != null) {throw ex;}if (remainingWaitMs != 0L) {this.wait(remainingWaitMs);}elapsed = System.currentTimeMillis() - begin;if (elapsed >= maxWaitMs) {throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}}}}

结论:

只要时延存在,没有银弹

只不过会在低请求时不会暴露问题,而随着请求数的增长,这个时延问题会一直被放大(xueqiu-push项目中50个以下就看不出来)

目前对于这个问题的解决路径是调大了metadata的expired-time,让producer在异步send的时候不在waitOnMetadata方法阻塞太长时间(错误,这个意思理解错了,请看上面解释)

所以要么在外面再添加一层异步调用,要么把kafka的server给换成本地的,网络延时kafka-client-1.X版本下目前还是会阻塞业务的

that's all!注意kafka的所有参数都有用,辛亏把matric监控指标打的全!!!!!欧耶!!

这篇关于kafka-producer异地性能损耗的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/559025

相关文章

Linux系统性能检测命令详解

《Linux系统性能检测命令详解》本文介绍了Linux系统常用的监控命令(如top、vmstat、iostat、htop等)及其参数功能,涵盖进程状态、内存使用、磁盘I/O、系统负载等多维度资源监控,... 目录toppsuptimevmstatIOStatiotopslabtophtopdstatnmon

JVisualVM之Java性能监控与调优利器详解

《JVisualVM之Java性能监控与调优利器详解》本文将详细介绍JVisualVM的使用方法,并结合实际案例展示如何利用它进行性能调优,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录1. JVisualVM简介2. JVisualVM的安装与启动2.1 启动JVisualVM2

Java使用MethodHandle来替代反射,提高性能问题

《Java使用MethodHandle来替代反射,提高性能问题》:本文主要介绍Java使用MethodHandle来替代反射,提高性能问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录一、认识MethodHandle1、简介2、使用方式3、与反射的区别二、示例1、基本使用2、(重要)

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

PyTorch高级特性与性能优化方式

《PyTorch高级特性与性能优化方式》:本文主要介绍PyTorch高级特性与性能优化方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、自动化机制1.自动微分机制2.动态计算图二、性能优化1.内存管理2.GPU加速3.多GPU训练三、分布式训练1.分布式数据

Java的"伪泛型"变"真泛型"后对性能的影响

《Java的伪泛型变真泛型后对性能的影响》泛型擦除本质上就是擦除与泛型相关的一切信息,例如参数化类型、类型变量等,Javac还将在需要时进行类型检查及强制类型转换,甚至在必要时会合成桥方法,这篇文章主... 目录1、真假泛型2、性能影响泛型存在于Java源代码中,在编译为字节码文件之前都会进行泛型擦除(ty

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python如何使用__slots__实现节省内存和性能优化

《Python如何使用__slots__实现节省内存和性能优化》你有想过,一个小小的__slots__能让你的Python类内存消耗直接减半吗,没错,今天咱们要聊的就是这个让人眼前一亮的技巧,感兴趣的... 目录背景:内存吃得满满的类__slots__:你的内存管理小助手举个大概的例子:看看效果如何?1.

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

Golang中拼接字符串的6种方式性能对比

《Golang中拼接字符串的6种方式性能对比》golang的string类型是不可修改的,对于拼接字符串来说,本质上还是创建一个新的对象将数据放进去,主要有6种拼接方式,下面小编就来为大家详细讲讲吧... 目录拼接方式介绍性能对比测试代码测试结果源码分析golang的string类型是不可修改的,对于拼接字