flink udf 介绍

2024-08-28 14:58
文章标签 介绍 flink udf

本文主要是介绍flink udf 介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ScalarFunction:标量函数是实现将0,1,或者多个标量值转化为一个新值TableFunction:一个输入多个行或者多个列AggregateFunction:多个输入一个输出package org.fuwushe.sql;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.fuwushe.sql.udf.FromUnixTimeUDF;
import org.fuwushe.sql.udf.Split;import java.util.Iterator;public class SqlUdfTest {public static void main(String []args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);tableEnv.registerFunction("split", new Split("#"));tableEnv.registerFunction("from_unixtime", new FromUnixTimeUDF());tableEnv.registerFunction("wAvg", new WeightedAvg());DataSet<String> input = env.readTextFile("/load/data/udf.txt");DataSet<UdfData> topInput = input.map(new MapFunction<String,UdfData>() {@Overridepublic UdfData map(String s) throws Exception {return JSONObject.parseObject(s,UdfData.class);}});Table udfTable = tableEnv.fromDataSet(topInput);tableEnv.registerTable("udf_table", udfTable);//ScalarFunctionTable udfResult = tableEnv.sqlQuery(" select from_unixtime(`time`) as creatTime,itemId FROM udf_table order by  creatTime desc  ");tableEnv.toDataSet(udfResult, UdfResult.class).print();//TableFunctionTable udtfResult1 =  tableEnv.sqlQuery("SELECT action, word, length FROM udf_table, LATERAL TABLE(split(action)) as T(word, length)");Table udtfResult2 =  tableEnv.sqlQuery("SELECT  action, word, length FROM udf_table LEFT JOIN LATERAL TABLE(split(action)) as T(word, length) ON TRUE");tableEnv.toDataSet(udtfResult1, UdtfResult.class).print();tableEnv.toDataSet(udtfResult2, UdtfResult.class).print();//AggregateFunction 6 1Table udafResult = tableEnv.sqlQuery("SELECT itemId, wAvg(price,wegiht) AS avgPoints FROM udf_table GROUP BY itemId");tableEnv.toDataSet(udafResult, UdafResult.class).print();}/*** Accumulator for WeightedAvg.*/public static class WeightedAvgAccum {public long sum = 0;public int count = 0;}/*** Weighted Average user-defined aggregate function.*/public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {@Overridepublic WeightedAvgAccum createAccumulator() {return new WeightedAvgAccum();}@Overridepublic Long getValue(WeightedAvgAccum acc) {if (acc.count == 0) {return null;} else {return acc.sum / acc.count;}}public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum -= iValue * iWeight;acc.count -= iWeight;}public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {Iterator<WeightedAvgAccum> iter = it.iterator();while (iter.hasNext()) {WeightedAvgAccum a = iter.next();acc.count += a.count;acc.sum += a.sum;}}public void resetAccumulator(WeightedAvgAccum acc) {acc.count = 0;acc.sum = 0L;}}public static class UdafResult {public UdafResult() {super();}public String itemId;public long avgPoints;public UdafResult(String itemId, long avgPoints) {this.itemId = itemId;this.avgPoints = avgPoints;}@Overridepublic String toString() {return "UdafResult{" + "itemId='" + itemId + '\'' + ", avgPoints=" + avgPoints + '}';}}public static class UdtfResult {public UdtfResult() {super();}public String action;public String word;public int length;public UdtfResult(String action, String word, int length) {this.action = action;this.word = word;this.length = length;}@Overridepublic String toString() {return "UdtfResult{" + "action='" + action + '\'' + ", word='" + word + '\'' + ", length=" + length + '}';}}public static class UdfResult {public UdfResult() {super();}public String itemId;public String creatTime;public UdfResult(String itemId, String creatTime) {this.itemId = itemId;this.creatTime = creatTime;}@Overridepublic String toString() {return "Result{" + "itemId='" + itemId + '\'' + ", creatTime='" + creatTime + '\'' + '}';}}public static class UdfData {public UdfData(String action, String itemId, String time, String unionId, Integer rankIndex, Integer wegiht,long price) {this.action = action;this.itemId = itemId;this.time = time;this.unionId = unionId;this.rankIndex = rankIndex;this.wegiht = wegiht;this.price = price;}public String action;public String itemId;public String time;public String unionId;public Integer rankIndex;public Integer wegiht;public long price;public UdfData() {super();}}
}

这篇关于flink udf 介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115087

相关文章

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

Java中HashMap的用法详细介绍

《Java中HashMap的用法详细介绍》JavaHashMap是一种高效的数据结构,用于存储键值对,它是基于哈希表实现的,提供快速的插入、删除和查找操作,:本文主要介绍Java中HashMap... 目录一.HashMap1.基本概念2.底层数据结构:3.HashCode和equals方法为什么重写Has

Springboot项目构建时各种依赖详细介绍与依赖关系说明详解

《Springboot项目构建时各种依赖详细介绍与依赖关系说明详解》SpringBoot通过spring-boot-dependencies统一依赖版本管理,spring-boot-starter-w... 目录一、spring-boot-dependencies1.简介2. 内容概览3.核心内容结构4.

setsid 命令工作原理和使用案例介绍

《setsid命令工作原理和使用案例介绍》setsid命令在Linux中创建独立会话,使进程脱离终端运行,适用于守护进程和后台任务,通过重定向输出和确保权限,可有效管理长时间运行的进程,本文给大家介... 目录setsid 命令介绍和使用案例基本介绍基本语法主要特点命令参数使用案例1. 在后台运行命令2.

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

zookeeper端口说明及介绍

《zookeeper端口说明及介绍》:本文主要介绍zookeeper端口说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、zookeeper有三个端口(可以修改)aVNMqvZ二、3个端口的作用三、部署时注意总China编程结一、zookeeper有三个端口(可以

Python中win32包的安装及常见用途介绍

《Python中win32包的安装及常见用途介绍》在Windows环境下,PythonWin32模块通常随Python安装包一起安装,:本文主要介绍Python中win32包的安装及常见用途的相关... 目录前言主要组件安装方法常见用途1. 操作Windows注册表2. 操作Windows服务3. 窗口操作

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

HTML img标签和超链接标签详细介绍

《HTMLimg标签和超链接标签详细介绍》:本文主要介绍了HTML中img标签的使用,包括src属性(指定图片路径)、相对/绝对路径区别、alt替代文本、title提示、宽高控制及边框设置等,详细内容请阅读本文,希望能对你有所帮助... 目录img 标签src 属性alt 属性title 属性width/h

MybatisPlus service接口功能介绍

《MybatisPlusservice接口功能介绍》:本文主要介绍MybatisPlusservice接口功能介绍,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友... 目录Service接口基本用法进阶用法总结:Lambda方法Service接口基本用法MyBATisP