解析 flink sql 转化成flink job

2024-06-24 01:44

本文主要是介绍解析 flink sql 转化成flink job,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 背景
    • 流程
    • flink实例
    • 实现细节
      • 定义的规则
      • 定义的物理算子
      • 定义的flink exec node

背景

在很多计算引擎里,都会把sql 这种标准语言,转成计算引擎下底层实际的算子,因此理解此转换的流程对于理解整个过程非常重要

流程

在这里插入图片描述

flink实例

public class BatchExample {public static void main(String[] args) {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);// 创建一个内置示例源表String sourceDDL = "CREATE TABLE users (\n" +"    id INT,\n" +"    name STRING,\n" +"    age INT\n" +") WITH (\n" +"    'connector' = 'filesystem',\n" +"    'path' = 'file:///Users/leishuiyu/IdeaProjects/SpringFlink/data.csv',\n" +"    'format' = 'csv'\n" +");";tableEnv.executeSql(sourceDDL);Table table = tableEnv.sqlQuery("select * from users limit 1 ");String explanation = tableEnv.explainSql("select * from users limit 1 ");System.out.println(explanation);table.execute().print();}
}

输出结果

== Abstract Syntax Tree ==
LogicalSort(fetch=[1])
+- LogicalProject(id=[$0], name=[$1], age=[$2])+- LogicalTableScan(table=[[default_catalog, default_database, users]])== Optimized Physical Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])== Optimized Execution Plan ==
Limit(offset=[0], fetch=[1], global=[true])
+- Exchange(distribution=[single])+- Limit(offset=[0], fetch=[1], global=[false])+- TableSourceScan(table=[[default_catalog, default_database, users, limit=[1]]], fields=[id, name, age])

实现细节

主要是三个地方,在优化那一步,就把原生的relnode 转化成了自定义的relnode,自定义的relnode 就可以带物理转化的内容了,比如上面的LogicalTableScan 转成BatchPhysicalTableSourceScan 这个relnode

定义的规则

class BatchPhysicalTableSourceScanRule(config: Config) extends ConverterRule(config) {/** Rule must only match if TableScan targets a bounded [[ScanTableSource]] *///规则只匹配有界的ScanTableSourceoverride def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0).asInstanceOf[TableScan]val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable])tableSourceTable match {case tst: TableSourceTable =>tst.tableSource match {case sts: ScanTableSource =>sts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE).isBoundedcase _ => false}case _ => false}}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[FlinkLogicalTableSourceScan]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)//在这里转成自定义的relnode new BatchPhysicalTableSourceScan(rel.getCluster,newTrait,scan.getHints,scan.getTable.asInstanceOf[TableSourceTable])}
}

定义的物理算子

也是一个relnode,实现类BatchPhysicalTableSourceScan

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {
//主要是这个方法,转成 flink exec算子override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}

定义的flink exec node

BatchExecTableSourceScan 类

 /// 主要是这个方法,看下下面的实现就比较熟悉了public Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment env,InputFormat<RowData, ?> inputFormat,InternalTypeInfo<RowData> outputTypeInfo,String operatorName) {// env.createInput will use ContinuousFileReaderOperator, but it do not support multiple// paths. If read partitioned source, after partition pruning, we need let InputFormat// to read multiple partitions which are multiple paths.// We can use InputFormatSourceFunction directly to support InputFormat.final InputFormatSourceFunction<RowData> function =new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);return env.addSource(function, operatorName, outputTypeInfo).getTransformation();}

这里的转换是多种方式,一种是现成的比如source 这种,还有的是函数这种,要通过代码生成的方法实现。flink代码生成

这篇关于解析 flink sql 转化成flink job的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088847

相关文章

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

一文解析C#中的StringSplitOptions枚举

《一文解析C#中的StringSplitOptions枚举》StringSplitOptions是C#中的一个枚举类型,用于控制string.Split()方法分割字符串时的行为,核心作用是处理分割后... 目录C#的StringSplitOptions枚举1.StringSplitOptions枚举的常用

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4

MyBatis延迟加载与多级缓存全解析

《MyBatis延迟加载与多级缓存全解析》文章介绍MyBatis的延迟加载与多级缓存机制,延迟加载按需加载关联数据提升性能,一级缓存会话级默认开启,二级缓存工厂级支持跨会话共享,增删改操作会清空对应缓... 目录MyBATis延迟加载策略一对多示例一对多示例MyBatis框架的缓存一级缓存二级缓存MyBat

深入理解Mysql OnlineDDL的算法

《深入理解MysqlOnlineDDL的算法》本文主要介绍了讲解MysqlOnlineDDL的算法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小... 目录一、Online DDL 是什么?二、Online DDL 的三种主要算法2.1COPY(复制法)

mysql8.0.43使用InnoDB Cluster配置主从复制

《mysql8.0.43使用InnoDBCluster配置主从复制》本文主要介绍了mysql8.0.43使用InnoDBCluster配置主从复制,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录1、配置Hosts解析(所有服务器都要执行)2、安装mysql shell(所有服务器都要执行)3、

前端缓存策略的自解方案全解析

《前端缓存策略的自解方案全解析》缓存从来都是前端的一个痛点,很多前端搞不清楚缓存到底是何物,:本文主要介绍前端缓存的自解方案,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、为什么“清缓存”成了技术圈的梗二、先给缓存“把个脉”:浏览器到底缓存了谁?三、设计思路:把“发版”做成“自愈”四、代码