Flink SQL自定义表值函数(Table Function)

2023-11-12 00:15

本文主要是介绍Flink SQL自定义表值函数(Table Function),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** 输入数据:* nc -lk 8888* a,bb,cc* * 输出结果:* * res1=>:5> +I[a,bb,cc, a, 1]* res1=>:7> +I[a,bb,cc, cc, 2]* res1=>:6> +I[a,bb,cc, bb, 2]* res8=>:4> +I[a,bb,cc, a, 1]* res8=>:5> +I[a,bb,cc, bb, 2]* res8=>:6> +I[a,bb,cc, cc, 2]* res4=>:3> +I[a,bb,cc, cc, 2]* res4=>:1> +I[a,bb,cc, a, 1]* res4=>:2> +I[a,bb,cc, bb, 2]* res7=>:8> +I[a,bb,cc, bb, 2]* res7=>:1> +I[a,bb,cc, cc, 2]* res7=>:7> +I[a,bb,cc, a, 1]* res2=>:2> +I[a,bb,cc, cc, 2]* res2=>:8> +I[a,bb,cc, a, 1]* res2=>:1> +I[a,bb,cc, bb, 2]* res6=>:1> +I[a,bb,cc, cc, 2]* res6=>:7> +I[a,bb,cc, a, 1]* res6=>:8> +I[a,bb,cc, bb, 2]* res3=>:6> +I[a,bb,cc, bb, 2]* res3=>:7> +I[a,bb,cc, cc, 2]* res3=>:5> +I[a,bb,cc, a, 1]* res5=>:7> +I[a,bb,cc, bb, 2]* res5=>:8> +I[a,bb,cc, cc, 2]* res5=>:6> +I[a,bb,cc, a, 1]*/
public class TableFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);Table table = tEnv.fromDataStream(source, "field");tEnv.createTemporaryView("SourceTable", table);// 在 Table API ⾥可以直接调⽤ UDFTable res1 = tEnv.from("SourceTable").joinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));Table res2 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));// 在 Table API ⾥重命名 UDF 的结果字段Table res3 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).as("myField", "newWord", "newLength").select($("myField"), $("newWord"), $("newLength"));// 注册函数tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);// 在 Table API ⾥调⽤注册好的 UDFTable res4 = tEnv.from("SourceTable").joinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));Table res5 = tEnv.from("SourceTable").leftOuterJoinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));// 在 SQL ⾥调⽤注册好的 UDFTable res6 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable, LATERAL TABLE(SplitFunction(field))");Table res7 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");// 在 SQL ⾥重命名 UDF 字段Table res8 = tEnv.sqlQuery("SELECT field, newWord, newLength " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");tEnv.toDataStream(res4).print("res4=>");tEnv.toDataStream(res5).print("res5=>");tEnv.toDataStream(res6).print("res6=>");tEnv.toDataStream(res7).print("res7=>");tEnv.toDataStream(res8).print("res8=>");env.execute();}@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(",")) {// 输出结果collect(Row.of(s, s.length()));}}}
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

这篇关于Flink SQL自定义表值函数(Table Function)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393790

相关文章

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优