flink udf 介绍

2024-08-28 14:58
文章标签 介绍 flink udf

本文主要是介绍flink udf 介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ScalarFunction:标量函数是实现将0,1,或者多个标量值转化为一个新值TableFunction:一个输入多个行或者多个列AggregateFunction:多个输入一个输出package org.fuwushe.sql;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.fuwushe.sql.udf.FromUnixTimeUDF;
import org.fuwushe.sql.udf.Split;import java.util.Iterator;public class SqlUdfTest {public static void main(String []args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);tableEnv.registerFunction("split", new Split("#"));tableEnv.registerFunction("from_unixtime", new FromUnixTimeUDF());tableEnv.registerFunction("wAvg", new WeightedAvg());DataSet<String> input = env.readTextFile("/load/data/udf.txt");DataSet<UdfData> topInput = input.map(new MapFunction<String,UdfData>() {@Overridepublic UdfData map(String s) throws Exception {return JSONObject.parseObject(s,UdfData.class);}});Table udfTable = tableEnv.fromDataSet(topInput);tableEnv.registerTable("udf_table", udfTable);//ScalarFunctionTable udfResult = tableEnv.sqlQuery(" select from_unixtime(`time`) as creatTime,itemId FROM udf_table order by  creatTime desc  ");tableEnv.toDataSet(udfResult, UdfResult.class).print();//TableFunctionTable udtfResult1 =  tableEnv.sqlQuery("SELECT action, word, length FROM udf_table, LATERAL TABLE(split(action)) as T(word, length)");Table udtfResult2 =  tableEnv.sqlQuery("SELECT  action, word, length FROM udf_table LEFT JOIN LATERAL TABLE(split(action)) as T(word, length) ON TRUE");tableEnv.toDataSet(udtfResult1, UdtfResult.class).print();tableEnv.toDataSet(udtfResult2, UdtfResult.class).print();//AggregateFunction 6 1Table udafResult = tableEnv.sqlQuery("SELECT itemId, wAvg(price,wegiht) AS avgPoints FROM udf_table GROUP BY itemId");tableEnv.toDataSet(udafResult, UdafResult.class).print();}/*** Accumulator for WeightedAvg.*/public static class WeightedAvgAccum {public long sum = 0;public int count = 0;}/*** Weighted Average user-defined aggregate function.*/public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() {return new WeightedAvgAccum();}@Overridepublic Long getValue(WeightedAvgAccum acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {Iterator<WeightedAvgAccum> iter = it.iterator();while (iter.hasNext()) {WeightedAvgAccum a = iter.next();acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccum acc) {acc.count = 0;acc.sum = 0L;}}public static class UdafResult {public UdafResult() {super();}public String itemId;public long avgPoints;public UdafResult(String itemId, long avgPoints) {this.itemId = itemId;this.avgPoints = avgPoints;}@Overridepublic String toString() {return "UdafResult{" + "itemId='" + itemId + '\'' + ", avgPoints=" + avgPoints + '}';}}public static class UdtfResult {public UdtfResult() {super();}public String action;public String word;public int length;public UdtfResult(String action, String word, int length) {this.action = action;this.word = word;this.length = length;}@Overridepublic String toString() {return "UdtfResult{" + "action='" + action + '\'' + ", word='" + word + '\'' + ", length=" + length + '}';}}public static class UdfResult {public UdfResult() {super();}public String itemId;public String creatTime;public UdfResult(String itemId, String creatTime) {this.itemId = itemId;this.creatTime = creatTime;}@Overridepublic String toString() {return "Result{" + "itemId='" + itemId + '\'' + ", creatTime='" + creatTime + '\'' + '}';}}public static class UdfData {public UdfData(String action, String itemId, String time, String unionId, Integer rankIndex, Integer wegiht,long price) {this.action = action;this.itemId = itemId;this.time = time;this.unionId = unionId;this.rankIndex = rankIndex;this.wegiht = wegiht;this.price = price;}public String action;public String itemId;public String time;public String unionId;public Integer rankIndex;public Integer wegiht;public long price;public UdfData() {super();}}
}

这篇关于flink udf 介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1115087

相关文章

Java中 instanceof 的用法详细介绍

《Java中instanceof的用法详细介绍》在Java中,instanceof是一个二元运算符(类型比较操作符),用于检查一个对象是否是某个特定类、接口的实例,或者是否是其子类的实例,这篇文章... 目录引言基本语法基本作用1. 检查对象是否是指定类的实例2. 检查对象是否是子类的实例3. 检查对象是否

什么是ReFS 文件系统? ntfs和refs的优缺点区别介绍

《什么是ReFS文件系统?ntfs和refs的优缺点区别介绍》最近有用户在Win11Insider的安装界面中发现,可以使用ReFS来格式化硬盘,这是不是意味着,ReFS有望在未来成为W... 数十年以来,Windows 系统一直将 NTFS 作为「内置硬盘」的默认文件系统。不过近些年来,微软还在研发一款名

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

redis过期key的删除策略介绍

《redis过期key的删除策略介绍》:本文主要介绍redis过期key的删除策略,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录第一种策略:被动删除第二种策略:定期删除第三种策略:强制删除关于big key的清理UNLINK命令FLUSHALL/FLUSHDB命

Pytest多环境切换的常见方法介绍

《Pytest多环境切换的常见方法介绍》Pytest作为自动化测试的主力框架,如何实现本地、测试、预发、生产环境的灵活切换,本文总结了通过pytest框架实现自由环境切换的几种方法,大家可以根据需要进... 目录1.pytest-base-url2.hooks函数3.yml和fixture结论你是否也遇到过

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

C++中函数模板与类模板的简单使用及区别介绍

《C++中函数模板与类模板的简单使用及区别介绍》这篇文章介绍了C++中的模板机制,包括函数模板和类模板的概念、语法和实际应用,函数模板通过类型参数实现泛型操作,而类模板允许创建可处理多种数据类型的类,... 目录一、函数模板定义语法真实示例二、类模板三、关键区别四、注意事项 ‌在C++中,模板是实现泛型编程

Python实现html转png的完美方案介绍

《Python实现html转png的完美方案介绍》这篇文章主要为大家详细介绍了如何使用Python实现html转png功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 1.增强稳定性与错误处理建议使用三层异常捕获结构:try: with sync_playwright(

Java使用多线程处理未知任务数的方案介绍

《Java使用多线程处理未知任务数的方案介绍》这篇文章主要为大家详细介绍了Java如何使用多线程实现处理未知任务数,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 知道任务个数,你可以定义好线程数规则,生成线程数去跑代码说明:1.虚拟线程池:使用 Executors.newVir

JAVA SE包装类和泛型详细介绍及说明方法

《JAVASE包装类和泛型详细介绍及说明方法》:本文主要介绍JAVASE包装类和泛型的相关资料,包括基本数据类型与包装类的对应关系,以及装箱和拆箱的概念,并重点讲解了自动装箱和自动拆箱的机制,文... 目录1. 包装类1.1 基本数据类型和对应的包装类1.2 装箱和拆箱1.3 自动装箱和自动拆箱2. 泛型2