kafka 消费能力小小见解及解决方案

2024-03-19 01:10

本文主要是介绍kafka 消费能力小小见解及解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.kafka 消费能力低的原因

kafka的速度是很快,所以一般来说producer的生产消息的逻辑速度都会比consumer的消费消息的逻辑速度快,查看topic情况发现:

MUC_EMP_CHANGE_NOTIFY

MUC_ORG

app_action

h5_action

topic的分区数partitions都是1 副本数replication-factor都是1,如下图

查看topic情况

./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --topic MUC_ORG --describe

在这里插入图片描述

查看消费情况,lag出现延迟1万

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --group mop-consumer --topic MUC_ORG

在这里插入图片描述
但是新创建的topic DEMO_KAFKA_SERVICE,如下图

./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --topic DEMO_KAFKA_SERVICE --describe

在这里插入图片描述

造成这种现象是server.properties修改了,新的server.properties增加了如下配置:

default.replication.factor=3replica.fetch.max.bytes=5242880delete.topic.enable=true

增加了分区和副本数,但是旧的topic还是之前配置,造成了消费能力还是一样低

2.解决方案(提高了partition的数量)

提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力
对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力(暂不考虑)
使用spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略(暂不考虑)

1) 修改server.properties配置,添加如下配置,重启kafka(已配置)
default.replication.factor=3replica.fetch.max.bytes=5242880delete.topic.enable=true
2) 删除topic,kafka暂时不支持修改topic的副本数
 ./kafka-topics.sh --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --delete –topic topicname
3) 修复后的消费情况

在这里插入图片描述

4)进入 ./zookeeper-shell.sh 172.28.21.250:2181 查看group

ls /consumers

删除 lag偏差的group rmr /consumers/group1

5)重启 mx-apps 服务重新创建group
6) 查看 消费情况

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 172.28.21.250:2181,172.28.21.249:2181,172.28.21.248:2181 --group apps-consumer --topic MUC_ORG

这篇关于kafka 消费能力小小见解及解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/824322

相关文章

线上Java OOM问题定位与解决方案超详细解析

《线上JavaOOM问题定位与解决方案超详细解析》OOM是JVM抛出的错误,表示内存分配失败,:本文主要介绍线上JavaOOM问题定位与解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一、OOM问题核心认知1.1 OOM定义与技术定位1.2 OOM常见类型及技术特征二、OOM问题定位工具

Python一次性将指定版本所有包上传PyPI镜像解决方案

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随... 目录为什么需要这个方案完整解决方案1. 项目目录结构2. 创建智能下载脚本3. 创建包清单生成脚本4

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

C#文件复制异常:"未能找到文件"的解决方案与预防措施

《C#文件复制异常:未能找到文件的解决方案与预防措施》在C#开发中,文件操作是基础中的基础,但有时最基础的File.Copy()方法也会抛出令人困惑的异常,当targetFilePath设置为D:2... 目录一个看似简单的文件操作问题问题重现与错误分析错误代码示例错误信息根本原因分析全面解决方案1. 确保

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

SpringBoot3匹配Mybatis3的错误与解决方案

《SpringBoot3匹配Mybatis3的错误与解决方案》文章指出SpringBoot3与MyBatis3兼容性问题,因未更新MyBatis-Plus依赖至SpringBoot3专用坐标,导致类冲... 目录SpringBoot3匹配MyBATis3的错误与解决mybatis在SpringBoot3如果

C++ vector越界问题的完整解决方案

《C++vector越界问题的完整解决方案》在C++开发中,std::vector作为最常用的动态数组容器,其便捷性与性能优势使其成为处理可变长度数据的首选,然而,数组越界访问始终是威胁程序稳定性的... 目录引言一、vector越界的底层原理与危害1.1 越界访问的本质原因1.2 越界访问的实际危害二、基

Python 字符串裁切与提取全面且实用的解决方案

《Python字符串裁切与提取全面且实用的解决方案》本文梳理了Python字符串处理方法,涵盖基础切片、split/partition分割、正则匹配及结构化数据解析(如BeautifulSoup、j... 目录python 字符串裁切与提取的完整指南 基础切片方法1. 使用切片操作符[start:end]2