如何管理Spark Streaming消费Kafka的偏移量(二)

2024-05-15 03:08

本文主要是介绍如何管理Spark Streaming消费Kafka的偏移量(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上篇文章,讨论了在spark streaming中管理消费kafka的偏移量的方式,本篇就接着聊聊上次说升级失败的案例。

事情发生一个月前,由于当时我们想提高spark streaming程序的并行处理性能,于是需要增加kafka分区个数,,这里需要说下,在新版本spark streaming和kafka的集成中,按照官网的建议 spark streaming的executors的数量要和kafka的partition的个数保持相等,这样每一个executor处理一个kafka partition的数据,效率是最高的。如果executors的数量大于kafka的分区个数,其实多余的executors相当于是不会处理任何数据,这部分的进程其实是白白浪费性能。

如果executor的个数小于kafka partition的个数,那么其实有一些executors进程是需要处理多个partition分区的数据的,所以官网建议spark executors的进程数和kafka partition的个数要保持一致。

那么问题来了,如果想要提高spark streaming的并行处理性能,只能增加kafka的分区了,给kafka增加分区比较容易,直接执行一个命令即可,不过这里需要注意,kafka的分区只能增加不能减少,所以添加分区要考虑到底多少个才合适。

接下来我们便增加了kafka分区的数量,同时修改了spark streaming的executors的个数和kafka的分区个数一一对应,然后就启动了流程序,结果出现了比较诡异的问题,表现如下:

造几条测试数据打入kafka中,发现程序总是只能处理其中的一部分数据,而每次总有一些数据丢失。按理说代码没有任何改动,只是增加kafka的分区和spark streaming的executors的个数,应该不会出现问题才对,于是又重新测了原来的旧分区和程序,发现没有问题,经过对比发现问题只会出现在kafka新增分区后,然后出现这种丢数据的情况。然后和运维同学一起看了新增的kafka的分区的磁盘目录是否有数据落入,经查询发现新的分区确实已经有数据进入了,这就很奇怪了丢的数据到底是怎么丢的?

最后我又检查了我们自己保存的kafka的offset,发现里面的偏移量竟然没有新增kafka的分区的偏移量,至此,终于找到问题所在,也就是说,如果没有新增分区的偏移量,那么程序运行时是不会处理新增分区的数据,而我们新增的分区确确实实有数据落入了,这就是为啥前面说的诡异的丢失数据的原因,其实是因为新增kafka的分区的数据程序并没有处理过而这个原因正是我们的自己保存offset中没有记录新增分区的偏移量。

问题找到了,那么如何修复线上丢失的数据呢?

当时想了一个比较笨的方法,因为我们的kafka线上默认是保留7天的数据,旧分区的数据已经处理过,就是新增的分区数据没有处理,所以我们删除了已经处理过的旧的分区的数据,然后在业务流量底峰时期,重新启了流程序,让其从最早的数据开始消费处理,这样以来因为旧的分区被删除,只有新分区有数据,所以相当于是把丢失的那部分数据给修复了。修复完成后,又把程序停止,然后配置从最新的偏移量开始处理,这样偏移量里面就能识别到新增的分区,然后就继续正常处理即可。

注意这里面的删除kafka旧分区的数据,是一个比较危险的操作,它要求kafka的节点需要全部重启才能生效,所以除非特殊情况,不要使用这么危险的方式。

后来,仔细分析了我们使用的一个开源程序管理offset的源码,发现这个程序有一点bug,没有考虑到kafka新增分区的情况,也就是说如果你的kafka分区增加了,你的程序在重启后是识别不到新增的分区的,所以如果新增的分区还有数据进入,那么你的程序一定会丢数据,因为扩展kafka分区这个操作,并不常见,所以这个bug比较难易触发。

知道原因后,解决起来比较容易了,就是每次启动流程序前,对比一下当前我们自己保存的kafka的分区的个数和从zookeeper里面的存的topic的分区个数是否一致,如果不一致,就把新增的分区给添加到我们自己保存的信息中,并发偏移量初始化成0,这样以来在程序启动后,就会自动识别新增分区的数据。

所以,回过头来看上面的那个问题,最简单优雅的解决方法就是,直接手动修改我们自己的保存的kafka的分区偏移量信息,把新增的分区给加入进去,然后重启流程序即可。

这个案例也就是我上篇文章所说的第三个场景的case,如果是自己手动管理kafka的offset一定要注意兼容新增分区后的这种情况,否则程序可能会出现丢失数据的问题。

这篇关于如何管理Spark Streaming消费Kafka的偏移量(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990635

相关文章

使用jenv工具管理多个JDK版本的方法步骤

《使用jenv工具管理多个JDK版本的方法步骤》jenv是一个开源的Java环境管理工具,旨在帮助开发者在同一台机器上轻松管理和切换多个Java版本,:本文主要介绍使用jenv工具管理多个JD... 目录一、jenv到底是干啥的?二、jenv的核心功能(一)管理多个Java版本(二)支持插件扩展(三)环境隔

Python中bisect_left 函数实现高效插入与有序列表管理

《Python中bisect_left函数实现高效插入与有序列表管理》Python的bisect_left函数通过二分查找高效定位有序列表插入位置,与bisect_right的区别在于处理重复元素时... 目录一、bisect_left 基本介绍1.1 函数定义1.2 核心功能二、bisect_left 与

Spring中管理bean对象的方式(专业级说明)

《Spring中管理bean对象的方式(专业级说明)》在Spring框架中,Bean的管理是核心功能,主要通过IoC(控制反转)容器实现,下面给大家介绍Spring中管理bean对象的方式,感兴趣的朋... 目录1.Bean的声明与注册1.1 基于XML配置1.2 基于注解(主流方式)1.3 基于Java

基于Python+PyQt5打造一个跨平台Emoji表情管理神器

《基于Python+PyQt5打造一个跨平台Emoji表情管理神器》在当今数字化社交时代,Emoji已成为全球通用的视觉语言,本文主要为大家详细介绍了如何使用Python和PyQt5开发一个功能全面的... 目录概述功能特性1. 全量Emoji集合2. 智能搜索系统3. 高效交互设计4. 现代化UI展示效果

Mysql中的用户管理实践

《Mysql中的用户管理实践》:本文主要介绍Mysql中的用户管理实践,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录13. 用户管理13.1 用户 13.1.1 用户信息 13.1.2 创建用户 13.1.3 删除用户 13.1.4 修改用户

查看MySql主从同步的偏移量方式

《查看MySql主从同步的偏移量方式》:本文主要介绍查看MySql主从同步的偏移量方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 1.mysql的主从同步方案mysqlphp为了在实现读写分离,主库写,从库读mysql的同步方案主要是通过从库读取主库的binl

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

linux服务之NIS账户管理服务方式

《linux服务之NIS账户管理服务方式》:本文主要介绍linux服务之NIS账户管理服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、所需要的软件二、服务器配置1、安装 NIS 服务2、设定 NIS 的域名 (NIS domain name)3、修改主

Python+PyQt5开发一个Windows电脑启动项管理神器

《Python+PyQt5开发一个Windows电脑启动项管理神器》:本文主要介绍如何使用PyQt5开发一款颜值与功能并存的Windows启动项管理工具,不仅能查看/删除现有启动项,还能智能添加新... 目录开篇:为什么我们需要启动项管理工具功能全景图核心技术解析1. Windows注册表操作2. 启动文件

gradle第三方Jar包依赖统一管理方式

《gradle第三方Jar包依赖统一管理方式》:本文主要介绍gradle第三方Jar包依赖统一管理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录背景实现1.顶层模块build.gradle添加依赖管理插件2.顶层模块build.gradle添加所有管理依赖包