记录一次flink代码优化

2024-05-27 21:12

本文主要是介绍记录一次flink代码优化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景介绍

业务中有用到flink读取mysql数据,然后实时处理回写到mysql和doris供c端使用,代码的大体逻辑都是
1.创建执行环境
2.写需要使用到表的flinksql ddl语句
3.通过flink sql 写核心的业务逻辑
4.将sql处理的结果通过toRetractStream来转换成一个可撤回流
5.对这个流进行其他处理,比如通过异步IO关联一些高基维表,数据解密,脱敏,过滤脏数据等工作
6.将处理后的结果数据写入到mysql和doris
但是在之前的代码中存在了很多的冗余代码和硬编码,存在如下问题
1.每个任务环境构建的大部分代码都是一样的
2.每一个表都需要手动去写flinksql ddl语句,而且直接写死在代码中,导致代码很长
3.核心的sql处理逻辑也是直接写到了代码中,不方便阅读和修改
4.表转流后的数据类型都需要手动写一个对应的实体类,每次加字段或者修改会很麻烦
5.JdbcSink 的sql语句需要写很长,而且还有写对应的PreparedStatement
6.存在多个库多个环境,数据库的配置信息太多没有一个统一管理的地方

优化思路

1.重复的代码抽取成工具类
2.数据库配置写到配置文件中而不是代码中
3.flink sql ddl语句根据表名和数据库名自动生成
4.核心的sql语句放到文件中,用的时候读取
5.表转流后统一转换成Row类型
6.写一个通用的JdbcSink方法,传入数据库配置,表名称,写入的字段即可自动生成一个SinkFunction
7.mybatis SqlSessionFactory存在多个,写一个工厂类型,通过传入不同的配置文件来生成不同的SqlSessionFactory
8.工具类的开发,数据库连接工具类,线程池工具类,文件读取工具类,环境构建工具类
9.将代码中的一些常量抽取出来放到一个全局的常量类中

遇到的一些问题

长时间没有数据到来,数据库连接失效的问题

解决方案 :
1.jdbc url 添加参数 &autoReconnect=true&maxReconnects=3
2.用完的连接要通过close方法归还到连接池,而不是其放到一个成员变量上长时间使用

mybatis sqlSession失效

使用mybatis的sqlSession用完后应该使用close方法释放资源,这里和数据库连接池的使用是一样的道理

flinkcdc server-id 冲突的问题

不知道之前是哪位大哥在网上写的server-id必须是5400-6400之间,所以一直在使用这个范围内的server-id,经常会遇到server-id冲突的问题,直到前段时间咨询了社区大佬并且也阅读了相关的源码才发现,server-id的范围是整型数字的方位,最大2147483647,所以可以使用的范围是极大的.

这篇关于记录一次flink代码优化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1008607

相关文章

Python中4大日志记录库比较的终极PK

《Python中4大日志记录库比较的终极PK》日志记录框架是一种工具,可帮助您标准化应用程序中的日志记录过程,:本文主要介绍Python中4大日志记录库比较的相关资料,文中通过代码介绍的非常详细,... 目录一、logging库1、优点2、缺点二、LogAid库三、Loguru库四、Structlogphp

docker编写java的jar完整步骤记录

《docker编写java的jar完整步骤记录》在平常的开发工作中,我们经常需要部署项目,开发测试完成后,最关键的一步就是部署,:本文主要介绍docker编写java的jar的相关资料,文中通过代... 目录all-docker/生成Docker打包部署文件配置服务A的Dockerfile (a/Docke

MySQL使用EXISTS检查记录是否存在的详细过程

《MySQL使用EXISTS检查记录是否存在的详细过程》EXISTS是SQL中用于检查子查询是否返回至少一条记录的运算符,它通常用于测试是否存在满足特定条件的记录,从而在主查询中进行相应操作,本文给大... 目录基本语法示例数据库和表结构1. 使用 EXISTS 在 SELECT 语句中2. 使用 EXIS

基于Spring Boot 的小区人脸识别与出入记录管理系统功能

《基于SpringBoot的小区人脸识别与出入记录管理系统功能》文章介绍基于SpringBoot框架与百度AI人脸识别API的小区出入管理系统,实现自动识别、记录及查询功能,涵盖技术选型、数据模型... 目录系统功能概述技术栈选择核心依赖配置数据模型设计出入记录实体类出入记录查询表单出入记录 VO 类(用于

java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)

《java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)》:本文主要介绍java中pdf模版填充表单踩坑的相关资料,OpenPDF、iText、PDFBox是三... 目录准备Pdf模版方法1:itextpdf7填充表单(1)加入依赖(2)代码(3)遇到的问题方法2:pd

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)

《k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)》本文记录在K8s上运行的MySQL/MariaDB备份方案,通过工具容器执行mysqldump,结合定时任务实... 目录前言一、获取需要备份的数据库的信息二、备份步骤1.准备工作(X86)1.准备工作(arm)2.手

SpringBoot3应用中集成和使用Spring Retry的实践记录

《SpringBoot3应用中集成和使用SpringRetry的实践记录》SpringRetry为SpringBoot3提供重试机制,支持注解和编程式两种方式,可配置重试策略与监听器,适用于临时性故... 目录1. 简介2. 环境准备3. 使用方式3.1 注解方式 基础使用自定义重试策略失败恢复机制注意事项

Python UV安装、升级、卸载详细步骤记录

《PythonUV安装、升级、卸载详细步骤记录》:本文主要介绍PythonUV安装、升级、卸载的详细步骤,uv是Astral推出的下一代Python包与项目管理器,主打单一可执行文件、极致性能... 目录安装检查升级设置自动补全卸载UV 命令总结 官方文档详见:https://docs.astral.sh/