Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)

2024-05-02 07:18

本文主要是介绍Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

基于flink1.14的源码做解析

公司内有很多业务方都在使用我们Flink sql平台做TopN的计算,今天同事突然问到我,Flink sql 是怎么实现topN的 ?

蒙圈了,这块源码没看过啊 ,业务要问起来怎么办,赶快打开源码补一下

拿到这个问题先冷静分析一下范围

首先肯定属于Flink sql模块,源码里面肯定是在flink-table-planner包里面,接着topN那不就是ROW_NUMBER嘛,是个函数呀

既然如此那就从flink源码的系统函数作为线索开始找起来,来到 org.apache.calcite.sql.fun.SqlStdOperatorTable类

62cbd27f6d6aeba1a0cdb8652fafd6ff.png

果然找到了,那calcite的某个rule肯定有个地方判断了它,继续查调用链

4a1a0811ae7d09f8432fc4ab6a0699f2.png

不出所料,FlinkLogicalRankRuleBase这个calcite的rule里面果然根据这个function的类型来确定rank的类型了

看下这个rule的匹配条件

8ba4b04a31435316c51a05d1cb564de5.png

 这里也好理解,overAgg的时候会判断这个rank以及对应的类型

ebd8f6c371d7d957d272e34ac347b706.png

这是只是做了一下简单的提取了rank的字段啊,提取谓语啊,提取表达式啊这一些拿信息的操作

然后直接生成新的relNode叫FlinkLogicalRank通过transformTo直接返回了这个等价节点

既然是relNode那肯定又会有calcite的rule去处理它,来找一找

c4fc899e183119a6565b03e2cf6cbda3.png

批处理的就不管了,从名字就可以看出来我们要找的类了

看个不带window的吧

7635ecfcdaaca214f6b1cdfcdc264277.png

 返回StreamPhysicalRank

这个类是一个FlinkPhysicalRel是可以转换成execNode的

这里在多说一句,

这里将partitionKey传入了,就是sql里面的partition by后面的,后面会用这个来创建transformation的keySelecter用来分流数据

6ad46b7648b447a2ed9aa7abd64818ce.png

返回的这个StreamExecRank就是可以转换成具体的Flink的算子了,具体逻辑就在里面了

接下来看下row_number的具体逻辑,找到方法translateToPlanInternal

根据策略主要分为三种类型

AppendFastStrategy  (输入仅包含插入时)

RetractStrategy   (输入包含update和delete)

UpdateFastStrategy     (输入不应包含删除且输入有给定的primaryKeys且按字段排序时)

来看个retractStrategy的吧

c69e6575ec8aad4dca5167a7eac95989.png

先通过sort的字段获取一个用于排序RowData的比较器 ComparableRecordComparator

根据比较器创建 RetractableTopNFunction

这个类还有两个主要的状态数据结构

17f3912aafbb8b203fd36bdff225378d.png

dataState这个map用来存放当key相同的所有数据会放在同一个list里面

treeMap这个可排序的map就是通过上面我们sql里面定义的sort by 来排序数据的,Long是指这个相同的key有多少个record

!!!!!!!!!!!  那就是用java的treeMap排序呗

继续往下看

a8a541f6abc24463e027b40c08c34aa6.png

 主逻辑就是这个了

每进入一条数据,会根据这条数据的类型划分

当数据是Insert , UPDATE_AFTER类型是会走 emitRecordsWithRowNumber()方法

当数据是UPDATE_BEFORE,DELETE类型会走 retractRecordWithRowNumber ()方法

来看下具体逻辑先看INSERT的

452086b9d3bf9bf74d4c612f0325e65d.png

 遍历treeMap

165b92d3bab16cd4c3c67ff0889cd052.png

解读一下,当数据是insert数据的时候

INSERT数据会先放到treeMap里面去,delete则不会

按顺序遍历treeMap

当遍历过程中发现遍历的key与当前数据的key相同时,和当前数据key相同的所有数据数据(dataState中的LIST),全部撤回并且更新他们的rowNumber+1

继续遍历treeMap

之后的数据全部撤回UpdateBefore,并且向下游发送UpdateAfter使rowNumber+1,遍历直到已经到第TopN个数据循环结束

当数据是DELETE类型的时候,会和Insert反过来,当前key之后的数据全部撤回,然后rowNumber-1

整个处理流程差不多就结束了,可以看到rowNumber当N较大且排序变化频繁的时候,性能消耗还是非常大的,极端情况下游的数据会翻很多倍

这个还需要注意在其他两个策略中还有一个参数,table.exec.topn.cache-size

43befe3eb8dbc3270fc476a887adf90b.png

 影响下面这个本地lruCache的大小

70d960ad16f949f02b4f716748f2c5ca.png

 调大可以减少状态的访问,可以按需要添加

本文地址:https://www.cnblogs.com/ljygz/p/15428840.html

end

Flink 从入门到精通 系列文章基于 Apache Flink 的实时监控告警系统
关于数据中台的深度思考与总结(干干货)
日志收集Agent,阴暗潮湿的地底世界

2b3a7dd27d2e596e5596fea658707508.png

701b350afc1517a778ca25978c60c0d5.png

公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug 👇

这篇关于Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/953681

相关文章

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源

SQL BETWEEN 语句的基本用法详解

《SQLBETWEEN语句的基本用法详解》SQLBETWEEN语句是一个用于在SQL查询中指定查询条件的重要工具,它允许用户指定一个范围,用于筛选符合特定条件的记录,本文将详细介绍BETWEEN语... 目录概述BETWEEN 语句的基本用法BETWEEN 语句的示例示例 1:查询年龄在 20 到 30 岁

MySQL DQL从入门到精通

《MySQLDQL从入门到精通》通过DQL,我们可以从数据库中检索出所需的数据,进行各种复杂的数据分析和处理,本文将深入探讨MySQLDQL的各个方面,帮助你全面掌握这一重要技能,感兴趣的朋友跟随小... 目录一、DQL 基础:SELECT 语句入门二、数据过滤:WHERE 子句的使用三、结果排序:ORDE

Java Spring ApplicationEvent 代码示例解析

《JavaSpringApplicationEvent代码示例解析》本文解析了Spring事件机制,涵盖核心概念(发布-订阅/观察者模式)、代码实现(事件定义、发布、监听)及高级应用(异步处理、... 目录一、Spring 事件机制核心概念1. 事件驱动架构模型2. 核心组件二、代码示例解析1. 事件定义

CSS place-items: center解析与用法详解

《CSSplace-items:center解析与用法详解》place-items:center;是一个强大的CSS简写属性,用于同时控制网格(Grid)和弹性盒(Flexbox)... place-items: center; 是一个强大的 css 简写属性,用于同时控制 网格(Grid) 和 弹性盒(F

MySQL MCP 服务器安装配置最佳实践

《MySQLMCP服务器安装配置最佳实践》本文介绍MySQLMCP服务器的安装配置方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下... 目录mysql MCP 服务器安装配置指南简介功能特点安装方法数据库配置使用MCP Inspector进行调试开发指

mysql中insert into的基本用法和一些示例

《mysql中insertinto的基本用法和一些示例》INSERTINTO用于向MySQL表插入新行,支持单行/多行及部分列插入,下面给大家介绍mysql中insertinto的基本用法和一些示例... 目录基本语法插入单行数据插入多行数据插入部分列的数据插入默认值注意事项在mysql中,INSERT I

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

python常见环境管理工具超全解析

《python常见环境管理工具超全解析》在Python开发中,管理多个项目及其依赖项通常是一个挑战,下面:本文主要介绍python常见环境管理工具的相关资料,文中通过代码介绍的非常详细,需要的朋友... 目录1. conda2. pip3. uvuv 工具自动创建和管理环境的特点4. setup.py5.

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名