kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量

2024-08-28 04:48

本文主要是介绍kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

往期精选

  • 第一篇我们讲到了docker的单机搭建。

  • 第二篇我们讲到了与springboot的整合。

  • 第三篇我们讲到了kafka的原理。

    这一篇我们将叙述,我是怎么在项目中进行对kafka优化的我们将从三方面进行考虑,一是代码;二是    配置;三是集群。项目背景,做数据迁移工作后面我将写几篇文章讲诉我们是怎么对百万数据进行迁移的工作)。主要场景利用kafka做读写分离,一直请求源数据写入到kafka生产者,然后kafka消费者进行写入数据到新数据。

一、配置优化《报错》节选:

[2018-09-25 11:23:59.370] ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] LoggingErrorHandler.java:37 - Error while processing: nullorg.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1324)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1185)at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688)at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.lang.Thread.run(Thread.java:748)

这是由于kafka一直生产数据,导致kafka消费太慢了。我们主要优化也是对消费者进行优化。根据上面的报错,我们可以看到一个参数:max-poll-records,所以我们首先将对提交数,进行调大。具体的需要根据项目进行测试,我们把数进行调大到100,同时对下面的参数进行:

#自动提交offset到zookeeper的时间间隔

auto-commit-interval: 1000

#earliest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 

#latest 

#当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 

#none 

#topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

auto-offset-reset: latest

#提交方式改为false,是否自动周期性提交已经拉取到消费端的消息offset

enable-auto-commit: false

由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略:

  然后我修改了kafka的配置(spring-kafka),需要到安装的文件(config)下进行修改,分别是生产文件和配置文件。

1.session.timeout.ms=100000(增大session超时时间)。

2.request.timeout.ms=110000(socket握手超时时间,默认是3000 但是kafka配置要求大于session.timeout.ms时间).

同时Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。

二、代码优化:《日志》节选

如果进行了的配置调优,差不多会提高kafka的消费能力,但是写入过大,控制台还是打印下面日志信息:

2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-4, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-1, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] LogContext.java:341 - [Consumer clientId=consumer-3, groupId=test-consumer-group] Revoking previously assigned partitions [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] AbstractMessageListenerContainer.java:343 - partitions revoked: [XXXXXX-0][2018-09-25 14:39:53.193] INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] LogContext.java:336 - [Consumer clientId=consumer-4, groupId=test-consumer-group] 

也是就说机制一直打印这些信息,但是又不报错,但是又不写入数据,我们就想,除了配置优化之后,能不能像数据库一样,批量提交或者说是批量消费呢?看了官网资料,发现确实可以,以下是我们对代码的优化,由单一的消费,改为批量消费:

一:增加一个config类。

@Configuration
@EnableKafkapublic class KafkaConfig {@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(10);factory.getContainerProperties().setPollTimeout(1500);factory.setBatchListener(true);//@KafkaListener 批量消费  每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIGfactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式return factory;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP地址需要修改");propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 100000);propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,110000);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 150);//每个批次获取数return propsMap;}}

二、更改消费接受代码。

 @KafkaListener(topics = {"消费名称需要改"})public void listen(List<ConsumerRecord> records, Acknowledgment ack) {try {for (ConsumerRecord record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =" + record);log.info("------------------ message =" + message);}}} catch (Exception e) {log.error("kafka失败,当前失败的批次。data:{}", records);e.printStackTrace();} finally {ack.acknowledge();}}

集群搭建

前面虽然优化配置和代码,但是代码执行还是不够快,网上寻找资料(提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力),说可以提高分区数量,如果单机怎么提高还是一样的(我们试过了),后来搭建了一个集群。注意我们是使用docker搭建kafka集群的,搭建过程如下。docker-compose.yml内容:

version: '2'services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9095:9095"environment:KAFKA_ADVERTISED_HOST_NAME: IP地址KAFKA_ADVERTISED_PORT: 9095KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://XXXXXXXX:9095KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9095KAFKA_DELETE_TOPIC_ENABLE: "true"KAFKA_LOG_RETENTION_HOURS: 1KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 100000KAFKA_NUM_PARTITIONS: 2KAFKA_DELETE_RETENTION_MS: 1000KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:- /var/run/docker.sock:/var/run/docker.sockkafka-manager:image: sheepkiller/kafka-managerlinks:- kafka- zookeeperenvironment:ZK_HOSTS: zookeeper:2181APPLICATION_SECRET: letmeinKM_ARGS: -Djava.net.preferIPv4Stack=trueports:- "9000:9000"

1.启动的命令:

docker-compose up -d

2.先去修改配置文件的端口,然后再启动相关的命令:

docker-compose scale kafka=2

3.再次修改文件袋的端口,然后再启动相关的命令:

docker-compose scale kafka=3

以上就是我所总结的kafka优化,欢迎有更好的方案进行交流,欢迎关注微信号:繁荣Aaron和转发。

这篇关于kafka优化(系列四):kafka配置优化和kafka批量消费,提高分区数量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1113773

相关文章

SpringBoot3.4配置校验新特性的用法详解

《SpringBoot3.4配置校验新特性的用法详解》SpringBoot3.4对配置校验支持进行了全面升级,这篇文章为大家详细介绍了一下它们的具体使用,文中的示例代码讲解详细,感兴趣的小伙伴可以参考... 目录基本用法示例定义配置类配置 application.yml注入使用嵌套对象与集合元素深度校验开发

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

IntelliJ IDEA 中配置 Spring MVC 环境的详细步骤及问题解决

《IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决》:本文主要介绍IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决,本文分步骤结合实例给大... 目录步骤 1:创建 Maven Web 项目步骤 2:添加 Spring MVC 依赖1、保存后执行2、将新的依赖

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

如何为Yarn配置国内源的详细教程

《如何为Yarn配置国内源的详细教程》在使用Yarn进行项目开发时,由于网络原因,直接使用官方源可能会导致下载速度慢或连接失败,配置国内源可以显著提高包的下载速度和稳定性,本文将详细介绍如何为Yarn... 目录一、查询当前使用的镜像源二、设置国内源1. 设置为淘宝镜像源2. 设置为其他国内源三、还原为官方

CentOS7更改默认SSH端口与配置指南

《CentOS7更改默认SSH端口与配置指南》SSH是Linux服务器远程管理的核心工具,其默认监听端口为22,由于端口22众所周知,这也使得服务器容易受到自动化扫描和暴力破解攻击,本文将系统性地介绍... 目录引言为什么要更改 SSH 默认端口?步骤详解:如何更改 Centos 7 的 SSH 默认端口1

Maven的使用和配置国内源的保姆级教程

《Maven的使用和配置国内源的保姆级教程》Maven是⼀个项目管理工具,基于POM(ProjectObjectModel,项目对象模型)的概念,Maven可以通过一小段描述信息来管理项目的构建,报告... 目录1. 什么是Maven?2.创建⼀个Maven项目3.Maven 核心功能4.使用Maven H

SpringBoot多数据源配置完整指南

《SpringBoot多数据源配置完整指南》在复杂的企业应用中,经常需要连接多个数据库,SpringBoot提供了灵活的多数据源配置方式,以下是详细的实现方案,需要的朋友可以参考下... 目录一、基础多数据源配置1. 添加依赖2. 配置多个数据源3. 配置数据源Bean二、JPA多数据源配置1. 配置主数据

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

Spring 基于XML配置 bean管理 Bean-IOC的方法

《Spring基于XML配置bean管理Bean-IOC的方法》:本文主要介绍Spring基于XML配置bean管理Bean-IOC的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录一. spring学习的核心内容二. 基于 XML 配置 bean1. 通过类型来获取 bean2. 通过