记录一次flink代码优化

2024-05-27 21:12

本文主要是介绍记录一次flink代码优化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景介绍

业务中有用到flink读取mysql数据,然后实时处理回写到mysql和doris供c端使用,代码的大体逻辑都是
1.创建执行环境
2.写需要使用到表的flinksql ddl语句
3.通过flink sql 写核心的业务逻辑
4.将sql处理的结果通过toRetractStream来转换成一个可撤回流
5.对这个流进行其他处理,比如通过异步IO关联一些高基维表,数据解密,脱敏,过滤脏数据等工作
6.将处理后的结果数据写入到mysql和doris
但是在之前的代码中存在了很多的冗余代码和硬编码,存在如下问题
1.每个任务环境构建的大部分代码都是一样的
2.每一个表都需要手动去写flinksql ddl语句,而且直接写死在代码中,导致代码很长
3.核心的sql处理逻辑也是直接写到了代码中,不方便阅读和修改
4.表转流后的数据类型都需要手动写一个对应的实体类,每次加字段或者修改会很麻烦
5.JdbcSink 的sql语句需要写很长,而且还有写对应的PreparedStatement
6.存在多个库多个环境,数据库的配置信息太多没有一个统一管理的地方

优化思路

1.重复的代码抽取成工具类
2.数据库配置写到配置文件中而不是代码中
3.flink sql ddl语句根据表名和数据库名自动生成
4.核心的sql语句放到文件中,用的时候读取
5.表转流后统一转换成Row类型
6.写一个通用的JdbcSink方法,传入数据库配置,表名称,写入的字段即可自动生成一个SinkFunction
7.mybatis SqlSessionFactory存在多个,写一个工厂类型,通过传入不同的配置文件来生成不同的SqlSessionFactory
8.工具类的开发,数据库连接工具类,线程池工具类,文件读取工具类,环境构建工具类
9.将代码中的一些常量抽取出来放到一个全局的常量类中

遇到的一些问题

长时间没有数据到来,数据库连接失效的问题

解决方案 :
1.jdbc url 添加参数 &autoReconnect=true&maxReconnects=3
2.用完的连接要通过close方法归还到连接池,而不是其放到一个成员变量上长时间使用

mybatis sqlSession失效

使用mybatis的sqlSession用完后应该使用close方法释放资源,这里和数据库连接池的使用是一样的道理

flinkcdc server-id 冲突的问题

不知道之前是哪位大哥在网上写的server-id必须是5400-6400之间,所以一直在使用这个范围内的server-id,经常会遇到server-id冲突的问题,直到前段时间咨询了社区大佬并且也阅读了相关的源码才发现,server-id的范围是整型数字的方位,最大2147483647,所以可以使用的范围是极大的.

这篇关于记录一次flink代码优化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1008607

相关文章

Python UV安装、升级、卸载详细步骤记录

《PythonUV安装、升级、卸载详细步骤记录》:本文主要介绍PythonUV安装、升级、卸载的详细步骤,uv是Astral推出的下一代Python包与项目管理器,主打单一可执行文件、极致性能... 目录安装检查升级设置自动补全卸载UV 命令总结 官方文档详见:https://docs.astral.sh/

统一返回JsonResult踩坑的记录

《统一返回JsonResult踩坑的记录》:本文主要介绍统一返回JsonResult踩坑的记录,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录统一返回jsonResult踩坑定义了一个统一返回类在使用时,JsonResult没有get/set方法时响应总结统一返回

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

java对接海康摄像头的完整步骤记录

《java对接海康摄像头的完整步骤记录》在Java中调用海康威视摄像头通常需要使用海康威视提供的SDK,下面这篇文章主要给大家介绍了关于java对接海康摄像头的完整步骤,文中通过代码介绍的非常详细,需... 目录一、开发环境准备二、实现Java调用设备接口(一)加载动态链接库(二)结构体、接口重定义1.类型

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

apache的commons-pool2原理与使用实践记录

《apache的commons-pool2原理与使用实践记录》ApacheCommonsPool2是一个高效的对象池化框架,通过复用昂贵资源(如数据库连接、线程、网络连接)优化系统性能,这篇文章主... 目录一、核心原理与组件二、使用步骤详解(以数据库连接池为例)三、高级配置与优化四、典型应用场景五、注意事

SpringBoot实现文件记录日志及日志文件自动归档和压缩

《SpringBoot实现文件记录日志及日志文件自动归档和压缩》Logback是Java日志框架,通过Logger收集日志并经Appender输出至控制台、文件等,SpringBoot配置logbac... 目录1、什么是Logback2、SpringBoot实现文件记录日志,日志文件自动归档和压缩2.1、

qtcreater配置opencv遇到的坑及实践记录

《qtcreater配置opencv遇到的坑及实践记录》我配置opencv不管是按照网上的教程还是deepseek发现都有些问题,下面是我的配置方法以及实践成功的心得,感兴趣的朋友跟随小编一起看看吧... 目录电脑环境下载环境变量配置qmake加入外部库测试配置我配置opencv不管是按照网上的教程还是de

使用nohup和--remove-source-files在后台运行rsync并记录日志方式

《使用nohup和--remove-source-files在后台运行rsync并记录日志方式》:本文主要介绍使用nohup和--remove-source-files在后台运行rsync并记录日... 目录一、什么是 --remove-source-files?二、示例命令三、命令详解1. nohup2.