【Table/SQL Api】Flink Table/SQL Api表转流读取MySQL

2023-12-10 22:28

本文主要是介绍【Table/SQL Api】Flink Table/SQL Api表转流读取MySQL,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引入依赖

jdbc依赖

flink-connector-jdbc + mysql-jdbc-driver 操作mysql数据库

        <!-- Flink-Connector-Jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId></dependency><!-- mysql jdbc driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>

Table/SQL Api依赖

  1. Table/SQL Api 扩展依赖
  2. Table/SQL Api 基础依赖
  3. Table/SQL Api 和 DataStream Api 交互的依赖 bridge
  4. Flink Planner 依赖
        <!-- Table/SQL Api 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId></dependency><!-- Table/SQL Api 扩展依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId></dependency><!-- bridge桥接器,主要负责Table API和 DataStream API的连接支持 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId></dependency><!-- Flink Planner 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId></dependency>

对应版本在这 (项目Flink版本为1.14.5

image-20231210161727111

Flink读写MySQL工具类

Table Api 环境加载

Table API和SQL Api都是基于Table接口

Table Api上下文环境有3种类型

  1. TableEnvironment:只支持Batch作业
  2. BatchTableEnvironment:只支持Batch作业
  3. StreamTableEnvironment: 支持流计算【用这个】

Planner(查询处理器)

Planner(查询处理器):解析sql、优化sql和执行sql

Flink Planner的类型:

  1. Flink Planner (Old Planner)
  2. Blink Planner (Flink 1.14之前需要手动导入依赖)

Blink Planner从Flink 1.11版本开始为Flink-table的默认查询处理器

Blink Planner使得Table Api & SQL 层实现了流批统一

Catalog对象

Catalog对象是提供了元数据信息,数据源与数据表的信息则存储在Catalog中

// 创建Catalog对象
new JdbcCatalog(catalog_name, database, username, passwd, url);

Catalog对象是接口

Catalog接口的实现:(Flink 1.14版本之前)

  1. PG (PostgresSQL) Catalog
  2. HiveCatalog
  3. Mysql Catalog (Flink 1.15 才有)

DDL与数据库表结构必须一模一样,建立映射,这种方式数据库表结构如果变化,代码也必须随之变化重新打包,因此这种方式用的不多,一般catalog会用的比较多。

但由于项目Flink依赖用的是1.14.5,因此还是使用DDL语句实现。

代码实现

public class MysqlUtil {/*** 数据库连接对象*/private static Connection connection = null;/*** SQL语句对象*/private static PreparedStatement preparedStatement = null;/*** 结果集对象*/private static ResultSet rs = null;/*** 使用 Flink Table/SQL Api 读取Mysql** @param env:           流计算上下文环境* @param parameterTool: 参数工具* @param clazz:         流水线输出对象的类* @param tableName:     表名* @param ddlString:     DDL字符串* @param sql:           SQL查询语句* @return DataStream<T>:DataStream对象*/public static <T> DataStream<T> readWithTableOrSQLApi(StreamExecutionEnvironment env,ParameterTool parameterTool,Class<T> clazz,String tableName,String ddlString,String sql) throws Exception {// 创建TableApi运行环境EnvironmentSettings bsSettings =EnvironmentSettings.newInstance()// Flink 1.14不需要再设置 Planner//.useBlinkPlanner()// 设置流计算模式.inStreamingMode().build();// 创建StreamTableEnvironment实例StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);// 指定方言 (选择使用SQL语法还是HQL语法)tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 编写DDL ( 数据定义语言 )String ddl = buildMysqlDDL(parameterTool, tableName, ddlString);// StreamTableEnvironment注册虚拟表tableEnv.executeSql(ddl);// 查询结果是Table对象Table table = tableEnv.sqlQuery(sql);// 将Table对象转换为DataStream对象return tableEnv.toDataStream(table, clazz);}/*** 根据参数生成MySQL的DDL语句** @param parameterTool  参数工具,用于获取MySQL连接信息* @param tableName      要创建的表名* @param ddlFieldString 表字段的DDL语句* @return 生成的完整的MySQL DDL语句*/public static String buildMysqlDDL(ParameterTool parameterTool,String tableName,String ddlFieldString) {// 从参数工具中获取mysql连接的urlString url = parameterTool.get(ParameterConstants.Mysql_URL);// 从参数工具中获取mysql连接的用户名String username = parameterTool.get(ParameterConstants.Mysql_USERNAME);// 从参数工具中获取mysql连接的密码String passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);// 从参数工具中获取MySQL的驱动程序String driver = parameterTool.get(ParameterConstants.Mysql_DRIVER);// 返回完整的DDL语句return "CREATE TABLE IF NOT EXISTS " +tableName +" (\n" +ddlFieldString +")" +" WITH (\n" +"'connector' = 'jdbc',\n" +"'driver' = '" + driver + "',\n" +"'url' = '" + url + "',\n" +"'username' = '" + username + "',\n" +"'password' = '" + passwd + "',\n" +"'table-name' = '" + tableName + "'\n" +")";}/*** 初始化 jdbc Connection*/public static Connection init(ParameterTool parameterTool) {String _url = parameterTool.get(ParameterConstants.Mysql_URL);String _username = parameterTool.get(ParameterConstants.Mysql_USERNAME);String _passwd = parameterTool.get(ParameterConstants.Mysql_PASSWD);try {connection = DriverManager.getConnection(_url, _username, _passwd);} catch (Exception e) {throw new RuntimeException(e);}return connection;}/*** 生成 PreparedStatement*/public static PreparedStatement initPreparedStatement(String sql) {try {preparedStatement = connection.prepareStatement(sql);} catch (Exception e) {throw new RuntimeException(e);}return preparedStatement;}/*** 关闭 jdbc Connection*/public static void close() {try {if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭 PreparedStatement*/public static void closePreparedStatement() {try {if (preparedStatement != null) {preparedStatement.close();}} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭 ResultSet*/public static void closeResultSet() {try {if (rs != null) {rs.close();}} catch (Exception e) {throw new RuntimeException(e);}}/*** 执行 sql 语句*/public static ResultSet executeQuery(PreparedStatement ps) {preparedStatement = ps;try {rs = preparedStatement.executeQuery();} catch (Exception e) {throw new RuntimeException(e);}return rs;}}

测试一下

测试库中有个tb_user表

image-20231210174346826

创建与表映射的实体类

@Data
public class UserPO {private Long id;private String name;
}
class MysqlUtilTest {@DisplayName("测试使用 Flink Table/SQL Api 读取Mysql")@Testpublic void testReadWithTableOrSQLApi() throws Exception {// 初始化环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 设置并行度1env.setParallelism(1);// 获取参数工具实例ParameterTool parameterTool = ParameterUtil.getParameters();/* ************************ CREATE 语句用于向当前或指定的 Catalog 中注册表。* 注册后的表、视图和函数可以在 SQL 查询中使用** *********************/// 表名String tableName = "tb_user";// 表字段ddlString ddlFieldString ="id BIGINT,\n" +"name STRING \n";// 查询表的全部字段String sql = "SELECT * FROM " + tableName;DataStream<UserPO> rowDataStream =MysqlUtil.readWithTableOrSQLApi(env,parameterTool,UserPO.class,tableName,ddlFieldString,sql);rowDataStream.print("mysql");env.execute();}
}

image-20231210174720832

查询成功!

这篇关于【Table/SQL Api】Flink Table/SQL Api表转流读取MySQL的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/478575

相关文章

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分