kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量

2024-08-28 04:48

本文主要是介绍kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

往期精选

  • 第一篇我们讲到了docker的单机搭建。

  • 第二篇我们讲到了与springboot的整合。

  • 第三篇我们讲到了kafka的原理。

    这一篇我们将叙述,我是怎么在项目中进行对kafka优化的我们将从三方面进行考虑,一是代码;二是    配置;三是集群。项目背景,做数据迁移工作后面我将写几篇文章讲诉我们是怎么对百万数据进行迁移的工作)。主要场景利用kafka做读写分离,一直请求源数据写入到kafka生产者,然后kafka消费者进行写入数据到新数据。

一、配置优化《报错》节选:

[2018-09-25 11:23:59.370] ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] LoggingErrorHandler.java:37 - Error while processing: nullorg.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1324)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1185)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688)at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.lang.Thread.run(Thread.java:748)

这是由于kafka一直生产数据,导致kafka消费太慢了。我们主要优化也是对消费者进行优化。根据上面的报错,我们可以看到一个参数:max-poll-records,所以我们首先将对提交数,进行调大。具体的需要根据项目进行测试,我们把数进行调大到100,同时对下面的参数进行:

#自动提交offset到zookeeper的时间间隔

auto-commit-interval: 1000

#earliest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 

#latest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 

#none 

#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

#提交方式改为false,是否自动周期性提交已经拉取到消费端的消息offset

enable-auto-commit: false

由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略:

  然后我修改了kafka的配置(spring-kafka),需要到安装的文件(config)下进行修改,分别是生产文件和配置文件。

1.session.timeout.ms=100000(增大session超时时间)。

2.request.timeout.ms=110000(socket握手超时时间,默认是3000 但是kafka配置要求大于session.timeout.ms时间).

同时Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

二、代码优化:《日志》节选

如果进行了的配置调优,差不多会提高kafka的消费能力,但是写入过大,控制台还是打印下面日志信息:

2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-4, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-3, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:336 - [Consumer clientId=consumer-4, groupId=test-consumer-group] 

也是就说机制一直打印这些信息,但是又不报错,但是又不写入数据,我们就想,除了配置优化之后,能不能像数据库一样,批量提交或者说是批量消费呢?看了官网资料,发现确实可以,以下是我们对代码的优化,由单一的消费,改为批量消费:

一:增加一个config类。

@Configuration
@EnableKafkapublic class KafkaConfig {@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(10);factory.getContainerProperties().setPollTimeout(1500);factory.setBatchListener(true);//@KafkaListener 批量消费  每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP地址需要修改");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000);propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,110000);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 150);//每个批次获取数return propsMap;}}

二、更改消费接受代码。

 @KafkaListener(topics = {"消费名称需要改"})public void listen(List<ConsumerRecord> records, Acknowledgment ack) {try {for (ConsumerRecord record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =" + record);log.info("------------------ message =" + message);}}} catch (Exception e) {log.error("kafka失败,当前失败的批次。data:{}", records);e.printStackTrace();} finally {ack.acknowledge();}}

集群搭建

前面虽然优化配置和代码,但是代码执行还是不够快,网上寻找资料(提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力),说可以提高分区数量,如果单机怎么提高还是一样的(我们试过了),后来搭建了一个集群。注意我们是使用docker搭建kafka集群的,搭建过程如下。docker-compose.yml内容:

version: '2'services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9095:9095"environment:KAFKA_ADVERTISED_HOST_NAME: IP地址KAFKA_ADVERTISED_PORT: 9095KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://XXXXXXXX:9095KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9095KAFKA_DELETE_TOPIC_ENABLE: "true"KAFKA_LOG_RETENTION_HOURS: 1KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 100000KAFKA_NUM_PARTITIONS: 2KAFKA_DELETE_RETENTION_MS: 1000KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sockkafka-manager:image: sheepkiller/kafka-managerlinks:- kafka- zookeeperenvironment:ZK_HOSTS: zookeeper:2181APPLICATION_SECRET: letmeinKM_ARGS: -Djava.net.preferIPv4Stack=trueports:- "9000:9000"

1.启动的命令:

docker-compose up -d

2.先去修改配置文件的端口,然后再启动相关的命令:

docker-compose scale kafka=2

3.再次修改文件袋的端口,然后再启动相关的命令:

docker-compose scale kafka=3

以上就是我所总结的kafka优化,欢迎有更好的方案进行交流,欢迎关注微信号:繁荣Aaron和转发。

这篇关于kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113773

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Debian系和Redhat系防火墙配置方式

《Debian系和Redhat系防火墙配置方式》文章对比了Debian系UFW和Redhat系Firewalld防火墙的安装、启用禁用、端口管理、规则查看及注意事项,强调SSH端口需开放、规则持久化,... 目录Debian系UFW防火墙1. 安装2. 启用与禁用3. 基本命令4. 注意事项5. 示例配置R

PyCharm中配置PyQt的实现步骤

《PyCharm中配置PyQt的实现步骤》PyCharm是JetBrains推出的一款强大的PythonIDE,结合PyQt可以进行pythion高效开发桌面GUI应用程序,本文就来介绍一下PyCha... 目录1. 安装China编程PyQt1.PyQt 核心组件2. 基础 PyQt 应用程序结构3. 使用 Q

Redis MCP 安装与配置指南

《RedisMCP安装与配置指南》本文将详细介绍如何安装和配置RedisMCP,包括快速启动、源码安装、Docker安装、以及相关的配置参数和环境变量设置,感兴趣的朋友一起看看吧... 目录一、Redis MCP 简介二、安www.chinasem.cn装 Redis MCP 服务2.1 快速启动(推荐)2.

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Python实现批量提取BLF文件时间戳

《Python实现批量提取BLF文件时间戳》BLF(BinaryLoggingFormat)作为Vector公司推出的CAN总线数据记录格式,被广泛用于存储车辆通信数据,本文将使用Python轻松提取... 目录一、为什么需要批量处理 BLF 文件二、核心代码解析:从文件遍历到数据导出1. 环境准备与依赖库

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be

小白也能轻松上手! 路由器设置优化指南

《小白也能轻松上手!路由器设置优化指南》在日常生活中,我们常常会遇到WiFi网速慢的问题,这主要受到三个方面的影响,首要原因是WiFi产品的配置优化不合理,其次是硬件性能的不足,以及宽带线路本身的质... 在数字化时代,网络已成为生活必需品,追剧、游戏、办公、学习都离不开稳定高速的网络。但很多人面对新路由器

linux批量替换文件内容的实现方式

《linux批量替换文件内容的实现方式》本文总结了Linux中批量替换文件内容的几种方法,包括使用sed替换文件夹内所有文件、单个文件内容及逐行字符串,强调使用反引号和绝对路径,并分享个人经验供参考... 目录一、linux批量替换文件内容 二、替换文件内所有匹配的字符串 三、替换每一行中全部str1为st