0基础学习PyFlink——用户自定义函数之UDTAF

2023-10-29 02:45

本文主要是介绍0基础学习PyFlink——用户自定义函数之UDTAF,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大纲

  • UDTAF
  • TableAggregateFunction的实现
    • 累加器
      • 定义
      • 创建
      • 累加
    • 返回
      • 类型
      • 计算
  • 完整代码

在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。
在这里插入图片描述

UDTAF

UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样可以返回任意数量的行作为输,又可以像《0基础学习PyFlink——用户自定义函数之UDAF》介绍的UDAF那样通过聚合的数据(多组)计算出一个值
举一个例子:我们拿到一个学生成绩表,每行包括:

  • 学生姓名
  • 英语成绩
  • 数学成绩
  • 年级

现在我们需要把这张表调整为:

  • 学生姓名
  • 成绩
  • 科目
  • 科目年级平均成绩
  • 年级
    在这里插入图片描述
    将一行中的“英语成绩”和“数学成绩”,拆成“成绩”和“科目”,相当于把一行数据拆解成多行,如上图左侧“张三”只有一行,而右侧有两行“张三”信息。这种拆解操作就需要T类型的用户自定义函数,比如UDTF和UDTAF。
    而我们需要计算一个年级一科的平均成绩,比如1年级英语的平均成绩,则需要按年级聚合之后再做计算。这个就需要A类型的用户自定义函数,比如UDAF和UDTAF。
    同时要满足上述两种技术方案的就是UDTAF。我们先看下主体代码,它和《0基础学习PyFlink——用户自定义函数之UDAF》中的很像。但是有两个重要区别:
  • 要设置成in_streaming_mode模式,否则会报错
  • udtaf要修饰一个对象,而非一个方法;
def calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])students_score = [("张三", 80.0, 60.0, "1"),("李四", 75.0, 95.0, "1"),("王五", 90.0, 90.0, "2"),("赵六", 85.0, 70.0, "2"),("孙七", 60.0, 0.0, "3"),]tab_source = t_env.from_elements(students_score, row_type_tab_source)split_class = udtaf(SplitClass())tab_source.group_by(col('grade')) \.flat_aggregate(split_class) \.select(col('*')) \.execute().print()

TableAggregateFunction的实现

用于计算的类要继承于TableAggregateFunction,即UDTAF中的TAF。

class SplitClass(TableAggregateFunction):_class_keys = ["english", "math"]

我们需要通过get_result_type告诉框架,UDTAF函数返回的是什么类型的数据。一般我们都是构造一个行类型——ROW,然后定义其每个字段的值和类型:

  • name:string类型,用户姓名;
  • score:float类型,考分;
  • avg score:float类型,科目年级平均分数;
  • class:sting类型,科目名称;

累加器

accumulator(累加器)是用于参与计算的中间数据。比如这个案例中,我们会向让accumulator保存拆解后的数据(即一行拆解成多行后的数据),然后再计算各年级每科的平均成绩。

定义

    def get_accumulator_type(self):return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])) 

因为只是为了保存展开的数据,于是我们只用定义均值计算之前的字段:

  • name:string类型,姓名;
  • score:float类型,分数;
  • class:string类型,科目名称;

创建

刚开始时,我们让其是一个空数组,对应上定义中的ARRAY类型。

    def create_accumulator(self):return []

累加

我们对科目进行遍历,进行行的拆分。即将(“张三”, 80.0, 60.0, “1”)拆解成(“张三”, 80.0, “english”)和(“张三”, 60.0, “math”)这样的两组数据。

    def accumulate(self, accumulator, row):for i in self._class_keys:accumulator.append(Row(row["name"], row[i], i))

返回

类型

    def get_result_type(self):return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])

可以看到result_type(返回类型)和accumulator_type(累加器类型)是不一样的(也可以一样,主要看怎么计算规则)。前者比后者多了“学科年级平均分”(avg score),这就更加接近我们希望获得的最终结果。
这些字段和我们目标字段只差一个grade(年级)。因为原始表中有grade,且我们会通过grade聚类,所以最终我们可以获得这个信息,而不用在这儿定义。
需要注意的是,虽然表值类型函数返回的是一组数据(若干Row),但是这儿只是返回Row的具体定义,而不是ARRAY[Row]。

计算

    def emit_value(self, accumulator):rows = []for i in self._class_keys: total = 0.0student_count = 0for y in accumulator:# y[2] y[]"class"]if i == y[2]:# y[1] y["score"]total = total + y[1]student_count = student_count + 1avg_score = total / student_countfor y in accumulator:if i == y[2]:rows.append(Row(y[0], y[1], avg_score, y[2]))for x in rows:   yield x

这个函数会在最后执行,它会通过累加器中的数据计算“学科年级平均分”,然后构造和“返回类型”一直的Row到rows数组中。最后通过yeild关键字返回一个生成器,我们可以将其看成还是一组Row,即拆解后的结果。

最后我们看下结果

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                          grade |                           name |                          score |                      avg score |                          class |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                           张三 |                           80.0 |                           77.5 |                        english |
| +I |                              1 |                           李四 |                           75.0 |                           77.5 |                        english |
| +I |                              1 |                           张三 |                           60.0 |                           77.5 |                           math |
| +I |                              1 |                           李四 |                           95.0 |                           77.5 |                           math |
| +I |                              2 |                           王五 |                           90.0 |                           87.5 |                        english |
| +I |                              2 |                           赵六 |                           85.0 |                           87.5 |                        english |
| +I |                              2 |                           王五 |                           90.0 |                           80.0 |                           math |
| +I |                              2 |                           赵六 |                           70.0 |                           80.0 |                           math |
| +I |                              3 |                           孙七 |                           60.0 |                           60.0 |                        english |
| +I |                              3 |                           孙七 |                            0.0 |                            0.0 |                           math |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
10 rows in set

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf,TableAggregateFunction
import pandas as pd
from pyflink.table.udf import UserDefinedFunction
from typing import Listclass SplitClass(TableAggregateFunction):_class_keys = ["english", "math"]def emit_value(self, accumulator):rows = []for i in self._class_keys: total = 0.0student_count = 0for y in accumulator:if i == y[2]:total = total + y[1]student_count = student_count + 1avg_score = total / student_countfor y in accumulator:if i == y[2]:rows.append(Row(y[0], y[1], avg_score, y[2]))return rowsdef create_accumulator(self):return []def accumulate(self, accumulator, row):for i in self._class_keys:accumulator.append(Row(row["name"], row[i], i))def get_accumulator_type(self):return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())]))  def get_result_type(self):return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])def calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])students_score = [("张三", 80.0, 60.0, "1"),("李四", 75.0, 95.0, "1"),("王五", 90.0, 90.0, "2"),("赵六", 85.0, 70.0, "2"),("孙七", 60.0, 0.0, "3"),]tab_source = t_env.from_elements(students_score, row_type_tab_source)split_class = udtaf(SplitClass())tab_source.group_by(col('grade')) \.flat_aggregate(split_class) \.select(col('*')) \.execute().print()if __name__ == '__main__':calc()

这篇关于0基础学习PyFlink——用户自定义函数之UDTAF的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/297740

相关文章

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

如何自定义一个log适配器starter

《如何自定义一个log适配器starter》:本文主要介绍如何自定义一个log适配器starter的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求Starter 项目目录结构pom.XML 配置LogInitializer实现MDCInterceptor

安装centos8设置基础软件仓库时出错的解决方案

《安装centos8设置基础软件仓库时出错的解决方案》:本文主要介绍安装centos8设置基础软件仓库时出错的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录安装Centos8设置基础软件仓库时出错版本 8版本 8.2.200android4版本 javas

Python中bisect_left 函数实现高效插入与有序列表管理

《Python中bisect_left函数实现高效插入与有序列表管理》Python的bisect_left函数通过二分查找高效定位有序列表插入位置,与bisect_right的区别在于处理重复元素时... 目录一、bisect_left 基本介绍1.1 函数定义1.2 核心功能二、bisect_left 与

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

java中BigDecimal里面的subtract函数介绍及实现方法

《java中BigDecimal里面的subtract函数介绍及实现方法》在Java中实现减法操作需要根据数据类型选择不同方法,主要分为数值型减法和字符串减法两种场景,本文给大家介绍java中BigD... 目录Java中BigDecimal里面的subtract函数的意思?一、数值型减法(高精度计算)1.

C++/类与对象/默认成员函数@构造函数的用法

《C++/类与对象/默认成员函数@构造函数的用法》:本文主要介绍C++/类与对象/默认成员函数@构造函数的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录名词概念默认成员函数构造函数概念函数特征显示构造函数隐式构造函数总结名词概念默认构造函数:不用传参就可以