使用Kafka时一定要注意防止消费速度过慢触发rebalance而导致的重复消费

本文主要是介绍使用Kafka时一定要注意防止消费速度过慢触发rebalance而导致的重复消费,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Java应用中,我们往往会使用spring-kafka组件简单的设置一下group_id, topic就开始消费消息了,其实这样会埋下巨大的安全隐患,即当消费速度过慢时有可能会触发rebalance, 这批消息被分配到另一个消费者,然后新的消费者还会消费过慢,再次rebalance, 这样一直恶性循环下去。发生这种情况最明显的标志就是日志里能看到CommitFailedException异常,然后还会带上下面一段话:

Commit cannot be completed since the group has already rebalanced and
assigned the partitions to another member. This means that the time
between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.

其实这段话已经很走心了,kafka的开发者已经预料到了这可能是个很容易出现的问题,所以连解决方案都给你列出来了。这里我们需要明确一下,在Kafka 0.10.1.0以后的版本中,影响rebalance触发的参数有三个,说明如下:

  • session.timeout.ms
    这个参数定义了当broker多久没有收到consumer的心跳请求后就触发rebalance,默认值是10s。在0.10.1.0之前的版本中,由于心跳请求是在poll()拉取消息的方法中执行的,因此如果当前批次处理消息耗时太长,就会导致consumer没有机会按时发送心跳,broker认为消费者已死,触发rebalance。在0.10.1.0或更新的版本中解决了这个问题,心跳请求会在单独的线程中发送,因此就不会出现因为消息处理过长而发不出心跳的问题了。

  • max.poll.interval.ms
    这个参数定义了两次poll()之间的最大间隔,默认值为5分钟。如果超过这个间隔同样会触发rebalance。在多数情况下这个参数是导致rebalance消息重复的关键,即业务处理消息耗时太长。有人可能会疑惑,如果5分钟都没处理完消息那肯定时出了问题,其实不然。能否在5min内处理完还取决于你每次拉取了多少条消息,如果一次拿到了成千上万条的话,5min就够呛了。

  • max.poll.records
    这个参数定义了poll()方法最多可以返回多少条消息,默认值为500。注意这里的用词是"最多",也就是说如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,就只返回500。这个默认值是比较坑人的,如果你的消息处理逻辑比较重,比如需要查数据库,调用接口,甚至是复杂计算,那么你很难保证能够在5min内处理完500条消息,也就是说,如果上游真的突然大爆发生产了成千上万条消息,而平摊到每个消费者身上的消息达到了500的又无法按时消费完成的话就会触发rebalance, 然后这批消息会被分配到另一个消费者中,还是会处理不完,又会触发rebalance, 这样这批消息就永远也处理不完,而且一直在重复处理。

要避免出现上述问题也很简单,那就是提前评估好处理一条消息最长需要多少时间,然后务必覆盖默认的max.poll.records参数。在spring-kafka中这个原生参数对应的参数项是max-poll-records。对于消息处理比较重的操作,建议把这个值改到50以下会保险一些。

这篇关于使用Kafka时一定要注意防止消费速度过慢触发rebalance而导致的重复消费的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/922221

相关文章

使用Go调用第三方API的方法详解

《使用Go调用第三方API的方法详解》在现代应用开发中,调用第三方API是非常常见的场景,比如获取天气预报、翻译文本、发送短信等,Go作为一门高效并发的编程语言,拥有强大的标准库和丰富的第三方库,可以... 目录引言一、准备工作二、案例1:调用天气查询 API1. 注册并获取 API Key2. 代码实现3

MySQL8.0临时表空间的使用及解读

《MySQL8.0临时表空间的使用及解读》MySQL8.0+引入会话级(temp_N.ibt)和全局(ibtmp1)InnoDB临时表空间,用于存储临时数据及事务日志,自动创建与回收,重启释放,管理高... 目录一、核心概念:为什么需要“临时表空间”?二、InnoDB 临时表空间的两种类型1. 会话级临时表

MySQL之复合查询使用及说明

《MySQL之复合查询使用及说明》文章讲解了SQL复合查询中emp、dept、salgrade三张表的使用,涵盖多表连接、自连接、子查询(单行/多行/多列)及合并查询(UNION/UNIONALL)等... 目录复合查询基本查询回顾多表查询笛卡尔积自连接子查询单行子查询多行子查询多列子查询在from子句中使

Kotlin 协程之Channel的概念和基本使用详解

《Kotlin协程之Channel的概念和基本使用详解》文章介绍协程在复杂场景中使用Channel进行数据传递与控制,涵盖创建参数、缓冲策略、操作方式及异常处理,适用于持续数据流、多协程协作等,需注... 目录前言launch / async 适合的场景Channel 的概念和基本使用概念Channel 的

C#使用SendMessage实现进程间通信的示例代码

《C#使用SendMessage实现进程间通信的示例代码》在软件开发中,进程间通信(IPC)是关键技术之一,C#通过调用WindowsAPI的SendMessage函数实现这一功能,本文将通过实例介绍... 目录第一章:SendMessage的底层原理揭秘第二章:构建跨进程通信桥梁2.1 定义通信协议2.2

使用python制作一款文件粉碎工具

《使用python制作一款文件粉碎工具》这篇文章主要为大家详细介绍了如何使用python制作一款文件粉碎工具,能够有效粉碎密码文件和机密Excel表格等,感兴趣的小伙伴可以了解一下... 文件粉碎工具:适用于粉碎密码文件和机密的escel表格等等,主要作用就是防止 别人用数据恢复大师把你刚删除的机密的文件恢

MySQL使用EXISTS检查记录是否存在的详细过程

《MySQL使用EXISTS检查记录是否存在的详细过程》EXISTS是SQL中用于检查子查询是否返回至少一条记录的运算符,它通常用于测试是否存在满足特定条件的记录,从而在主查询中进行相应操作,本文给大... 目录基本语法示例数据库和表结构1. 使用 EXISTS 在 SELECT 语句中2. 使用 EXIS

在Android中使用WebView在线查看PDF文件的方法示例

《在Android中使用WebView在线查看PDF文件的方法示例》在Android应用开发中,有时我们需要在客户端展示PDF文件,以便用户可以阅读或交互,:本文主要介绍在Android中使用We... 目录简介:1. WebView组件介绍2. 在androidManifest.XML中添加Interne

Java Stream流与使用操作指南

《JavaStream流与使用操作指南》Stream不是数据结构,而是一种高级的数据处理工具,允许你以声明式的方式处理数据集合,类似于SQL语句操作数据库,本文给大家介绍JavaStream流与使用... 目录一、什么是stream流二、创建stream流1.单列集合创建stream流2.双列集合创建str

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv