解析 flink sql 转化成flink job

2024-06-24 01:44

本文主要是介绍解析 flink sql 转化成flink job,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {public static void main(String[] args) {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 创建一个内置示例源表String sourceDDL = "CREATE TABLE users (\n" +"    id INT,\n" +"    name STRING,\n" +"    age INT\n" +") WITH (\n" +"    'connector' = 'filesystem',\n" +"    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +"    'format' = 'csv'\n" +");";tableEnv.executeSql(sourceDDL);Table table = tableEnv.sqlQuery("select * from users limit 1 ");String explanation = tableEnv.explainSql("select * from users limit 1 ");System.out.println(explanation);table.execute().print();}
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])+- LogicalTableScan(table=[[default_catalog, default_database, users]])== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] *///规则只匹配有界的ScanTableSourceoverride def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0).asInstanceOf[TableScan]val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])tableSourceTable match {case tst: TableSourceTable =>tst.tableSource match {case sts: ScanTableSource =>sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBoundedcase _ => false}case _ => false}}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)//在这里转成自定义的relnode new BatchPhysicalTableSourceScan(rel.getCluster,newTrait,scan.getHints,scan.getTable.asInstanceOf[TableSourceTable])}
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了public Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment env,InputFormat<RowData, ?> inputFormat,InternalTypeInfo<RowData> outputTypeInfo,String operatorName) {// env.createInput will use ContinuousFileReaderOperator, but it do not support multiple// paths. If read partitioned source, after partition pruning, we need let InputFormat// to read multiple partitions which are multiple paths.// We can use InputFormatSourceFunction directly to support InputFormat.final InputFormatSourceFunction<RowData> function =new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);return env.addSource(function, operatorName, outputTypeInfo).getTransformation();}

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

这篇关于解析 flink sql 转化成flink job的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088847

相关文章

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

全面解析Golang 中的 Gorilla CORS 中间件正确用法

《全面解析Golang中的GorillaCORS中间件正确用法》Golang中使用gorilla/mux路由器配合rs/cors中间件库可以优雅地解决这个问题,然而,很多人刚开始使用时会遇到配... 目录如何让 golang 中的 Gorilla CORS 中间件正确工作一、基础依赖二、错误用法(很多人一开

Mysql中设计数据表的过程解析

《Mysql中设计数据表的过程解析》数据库约束通过NOTNULL、UNIQUE、DEFAULT、主键和外键等规则保障数据完整性,自动校验数据,减少人工错误,提升数据一致性和业务逻辑严谨性,本文介绍My... 目录1.引言2.NOT NULL——制定某列不可以存储NULL值2.UNIQUE——保证某一列的每一

解密SQL查询语句执行的过程

《解密SQL查询语句执行的过程》文章讲解了SQL语句的执行流程,涵盖解析、优化、执行三个核心阶段,并介绍执行计划查看方法EXPLAIN,同时提出性能优化技巧如合理使用索引、避免SELECT*、JOIN... 目录1. SQL语句的基本结构2. SQL语句的执行过程3. SQL语句的执行计划4. 常见的性能优

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND

SQL Server安装时候没有中文选项的解决方法

《SQLServer安装时候没有中文选项的解决方法》用户安装SQLServer时界面全英文,无中文选项,通过修改安装设置中的国家或地区为中文中国,重启安装程序后界面恢复中文,解决了问题,对SQLSe... 你是不是在安装SQL Server时候发现安装界面和别人不同,并且无论如何都没有中文选项?这个问题也