Flink SQL自定义表值函数(Table Function)

2023-11-10 11:20

本文主要是介绍Flink SQL自定义表值函数(Table Function),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;/*** 输入数据:* nc -lk 8888* a,bb,cc* * 输出结果:* * res1=>:5> +I[a,bb,cc, a, 1]* res1=>:7> +I[a,bb,cc, cc, 2]* res1=>:6> +I[a,bb,cc, bb, 2]* res8=>:4> +I[a,bb,cc, a, 1]* res8=>:5> +I[a,bb,cc, bb, 2]* res8=>:6> +I[a,bb,cc, cc, 2]* res4=>:3> +I[a,bb,cc, cc, 2]* res4=>:1> +I[a,bb,cc, a, 1]* res4=>:2> +I[a,bb,cc, bb, 2]* res7=>:8> +I[a,bb,cc, bb, 2]* res7=>:1> +I[a,bb,cc, cc, 2]* res7=>:7> +I[a,bb,cc, a, 1]* res2=>:2> +I[a,bb,cc, cc, 2]* res2=>:8> +I[a,bb,cc, a, 1]* res2=>:1> +I[a,bb,cc, bb, 2]* res6=>:1> +I[a,bb,cc, cc, 2]* res6=>:7> +I[a,bb,cc, a, 1]* res6=>:8> +I[a,bb,cc, bb, 2]* res3=>:6> +I[a,bb,cc, bb, 2]* res3=>:7> +I[a,bb,cc, cc, 2]* res3=>:5> +I[a,bb,cc, a, 1]* res5=>:7> +I[a,bb,cc, bb, 2]* res5=>:8> +I[a,bb,cc, cc, 2]* res5=>:6> +I[a,bb,cc, a, 1]*/
public class TableFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);Table table = tEnv.fromDataStream(source, "field");tEnv.createTemporaryView("SourceTable", table);// 在 Table API ⾥可以直接调⽤ UDFTable res1 = tEnv.from("SourceTable").joinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));Table res2 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).select($("field"), $("word"), $("length"));// 在 Table API ⾥重命名 UDF 的结果字段Table res3 = tEnv.from("SourceTable").leftOuterJoinLateral(call(SplitFunction.class, $("field"))).as("myField", "newWord", "newLength").select($("myField"), $("newWord"), $("newLength"));// 注册函数tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);// 在 Table API ⾥调⽤注册好的 UDFTable res4 = tEnv.from("SourceTable").joinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));Table res5 = tEnv.from("SourceTable").leftOuterJoinLateral(call("SplitFunction", $("field"))).select($("field"), $("word"), $("length"));// 在 SQL ⾥调⽤注册好的 UDFTable res6 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable, LATERAL TABLE(SplitFunction(field))");Table res7 = tEnv.sqlQuery("SELECT field, word, length " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");// 在 SQL ⾥重命名 UDF 字段Table res8 = tEnv.sqlQuery("SELECT field, newWord, newLength " +"FROM SourceTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");tEnv.toDataStream(res4).print("res4=>");tEnv.toDataStream(res5).print("res5=>");tEnv.toDataStream(res6).print("res6=>");tEnv.toDataStream(res7).print("res7=>");tEnv.toDataStream(res8).print("res8=>");env.execute();}@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))public static class SplitFunction extends TableFunction<Row> {public void eval(String str) {for (String s : str.split(",")) {// 输出结果collect(Row.of(s, s.length()));}}}
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

这篇关于Flink SQL自定义表值函数(Table Function)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/382321

相关文章

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

Vite 打包目录结构自定义配置小结

《Vite打包目录结构自定义配置小结》在Vite工程开发中,默认打包后的dist目录资源常集中在asset目录下,不利于资源管理,本文基于Rollup配置原理,本文就来介绍一下通过Vite配置自定义... 目录一、实现原理二、具体配置步骤1. 基础配置文件2. 配置说明(1)js 资源分离(2)非 JS 资

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

MySQL分库分表的实践示例

《MySQL分库分表的实践示例》MySQL分库分表适用于数据量大或并发压力高的场景,核心技术包括水平/垂直分片和分库,需应对分布式事务、跨库查询等挑战,通过中间件和解决方案实现,最佳实践为合理策略、备... 目录一、分库分表的触发条件1.1 数据量阈值1.2 并发压力二、分库分表的核心技术模块2.1 水平分

Python与MySQL实现数据库实时同步的详细步骤

《Python与MySQL实现数据库实时同步的详细步骤》在日常开发中,数据同步是一项常见的需求,本篇文章将使用Python和MySQL来实现数据库实时同步,我们将围绕数据变更捕获、数据处理和数据写入这... 目录前言摘要概述:数据同步方案1. 基本思路2. mysql Binlog 简介实现步骤与代码示例1

GO语言中函数命名返回值的使用

《GO语言中函数命名返回值的使用》在Go语言中,函数可以为其返回值指定名称,这被称为命名返回值或命名返回参数,这种特性可以使代码更清晰,特别是在返回多个值时,感兴趣的可以了解一下... 目录基本语法函数命名返回特点代码示例命名特点基本语法func functionName(parameters) (nam

使用shardingsphere实现mysql数据库分片方式

《使用shardingsphere实现mysql数据库分片方式》本文介绍如何使用ShardingSphere-JDBC在SpringBoot中实现MySQL水平分库,涵盖分片策略、路由算法及零侵入配置... 目录一、ShardingSphere 简介1.1 对比1.2 核心概念1.3 Sharding-Sp