flink重温笔记(十九): flinkSQL 顶层 API ——FlinkSQL 窗口(解决动态累积数据业务需求)

本文主要是介绍flink重温笔记(十九): flinkSQL 顶层 API ——FlinkSQL 窗口(解决动态累积数据业务需求),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink学习笔记

前言:今天是学习 flink 的第 19 天啦!学习了 flinkSQL 中窗口的应用,包括滚动窗口,滑动窗口,会话窗口,累计窗口,学会了如何计算累计值(类似于中视频计划中的累计播放量业务需求),多维数据分析等大数据热点问题,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 六、FlinkSQL 窗口
      • 1. 窗口表值函数(tvfs)
      • 2. 窗口分类函数及聚合操作
        • 2.1 滚动窗口(Tumble Windows)
        • 2.2 滑动窗口(Hop Windows)
        • 2.3 会话窗口(Session Windows,暂不支持 Window TVF)
        • 2.4 累计窗口(Comulate Windows flink1.13 版本新特性)
      • 3. 多维数据分析
        • 3.1 GROUPING SETS
        • 3.2 ROLLUP
        • 3.3 CUBE
        • 3.4 GROUPING 和 GROUPING_ID
          • 3.4.1 GROUPING 函数
          • 3.4.2 GROUPING_ID(兼容 Hive)
        • 3.5 Window Top-N
      • 4. Over Windows
        • 4.1 ROWS OVER WINDOW
        • 4.2 RANGE OVER WINDOW
      • 5. TableAPI 窗口的定义
        • 5.1.1 滚动窗口
        • 5.1.2 滑动窗口
        • 5.1.3 会话窗口

六、FlinkSQL 窗口

1. 窗口表值函数(tvfs)

将流变成特殊的“批”处理,常用的窗口:

  • 滑动窗口
  • 滚动窗口
  • 会话窗口(flink 1.14 版本支持)
  • 累计窗口(flink 1.13 版本新增)

在 flink 1.13 之前,是一个特殊的 GroupWindowFunction

SELECTTUMBLE_START( bidtime, INTERVAL '10' MINUTE),TUMBLE_END( bidtime, INTERVAL '10' MINUTE),TUMBLE_ROWTIME( bidtime, INTERVAL '10' MINUTE),SUM(price)
FROM MyTable
GROUP BY TUMBLE( bidtime, INTERVAL '10' MINUTE),

在 flink 1.13 之后,用 Table-Value Function 进行语法标准化

SELECT window_start, window_end, window_time, SUM(price)
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
)
GROUP BY window_start, window_end;

2. 窗口分类函数及聚合操作

2.1 滚动窗口(Tumble Windows)

语法:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)data:一个表名。
timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻转窗口。
size:是指定滚动窗口宽度的持续时间。

数据:

2021-04-15 08:05:00,4.00,C
2021-04-15 08:07:00,2.00,A
2021-04-15 08:09:00,5.00,D
2021-04-15 08:11:00,3.00,B
2021-04-15 08:13:00,1.00,E
2021-04-15 08:17:00,6.00,F

需求:现在有一个实时数据看板,需要计算当前每10分钟GMV的总和

package cn.itcast.day02.Window;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author lql* @time 2024-03-16 17:33:47* @description TODO*/
public class GroupWindowsSqlTumbleExample {public static void main(String[] args) throws Exception {//todo 1)构建flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)设置并行度env.setParallelism(1);//todo 3)构建flink的表的运行环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);String filePath = GroupWindowsSqlTumbleExample.class.getClassLoader().getResource("bid.csv").getPath();tabEnv.executeSql("create table Bid(" +"bidtime TIMESTAMP(3)," +"price DECIMAL(10, 2), " +"item string," +"watermark for bidtime as bidtime - interval '1' second) " +"with("+ "'connector' = 'filesystem',"+ "'path' = 'file:///"+filePath+"',"+ "'format' = 'csv'"+ ")");Table table = tabEnv.sqlQuery("" +"select window_start,window_end,sum(price) as sum_price " +" from table(" +"  tumble(table Bid, DESCRIPTOR(bidtime), interval '10' MINUTES))" +"  group by window_start,window_end");tabEnv.toAppendStream(table, Row.class).print();env.execute();}
}

结果:

+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00]
+I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]

2.2 滑动窗口(Hop Windows)

语法:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])data:是一个表名。
timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到滑动窗口。
slide:是一个持续时间,指定了连续跳跃窗口开始之间的持续时间
size:是指定跳变窗口宽度的持续时间

需求:每隔 5 分钟,统计 10 分钟的数据

package cn.itcast.day02.Window;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;/*** @author lql* @time 2024-03-16 19:28:30* @description TODO*/
public class GroupWindowsSqlHopExample {public static void main(String[] args) throws Exception {//todo 1)构建flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)设置并行度env.setParallelism(1);//todo 3)构建flink的表的运行环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);String filePath = GroupWindowsSqlHopExample.class.getClassLoader().getResource("bid.csv").getPath();tabEnv.executeSql("create table Bid(" +"bidtime TIMESTAMP(3)," +"price DECIMAL(10, 2), " +"item string," +"watermark for bidtime as bidtime - interval '1' second) " +"with("+ "'connector' = 'filesystem',"+ "'path' = 'file:///"+filePath+"',"+ "'format' = 'csv'"+ ")");Table table = tabEnv.sqlQuery("" +"select window_start,window_end,sum(price) as sum_price " +" from table(" +"  hop(table Bid, DESCRIPTOR(bidtime), interval '5' MINUTES, interval '10' MINUTES))" +"  group by window_start,window_end");tabEnv.toAppendStream(table, Row.class).print();env.execute();}
}

结果:

+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00]
+I[2021-04-15T08:05, 2021-04-15T08:15, 15.00]
+I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]
+I[2021-04-15T08:15, 2021-04-15T08:25, 6.00]

2.3 会话窗口(Session Windows,暂不支持 Window TVF)

Flink1.13 版本中不支持 Window TVF,预计在 flink1.14 版本中支持;

需求:用老版本实现,定义 Session Gap 为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口

package cn.itcast.day02.Window;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author lql* @time 2024-03-16 19:37:20* @description TODO*/
public class GroupWindowsSqlSessionExample {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);String filePath = GroupWindowsSqlSessionExample.class.getClassLoader().getResource("bid.csv").getPath();// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 ttEnv.executeSql("create table Bid(" +"bidtime TIMESTAMP(3)," +"price DECIMAL(10, 2), " +"item string," +"watermark for bidtime as bidtime - interval '1' second) " +"with("+ "'connector' = 'filesystem',"+ "'path' = 'file:///"+filePath+"',"+ "'format' = 'csv'"+ ")");tEnv.sqlQuery("SELECT " +"  SESSION_START(bidtime, INTERVAL '3' minute) as wStart,  " +"  SESSION_END(bidtime, INTERVAL '3' minute) as wEnd,  " +"  SUM(price) sum_price " +"FROM Bid " +"GROUP BY SESSION(bidtime, INTERVAL '3' minute)").execute().print();}
}

结果:

+----+-------------------------+-------------------------+-----------+
| op |                  wStart |                    wEnd | sum_price |
+----+-------------------------+-------------------------+-----------+
| +I | 2021-04-15 08:05:00.000 | 2021-04-15 08:16:00.000 |     15.00 |
| +I | 2021-04-15 08:17:00.000 | 2021-04-15 08:20:00.000 |      6.00 |
+----+-------------------------+-------------------------+-----------+
2 rows in set

2.4 累计窗口(Comulate Windows flink1.13 版本新特性)

语法:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
TABLE 表名称
DESCRIPTOR 表中作为开窗的时间字段名称
step 大窗口的分割长度
size 指定最大的那个时间窗口

需求:10 分钟作为窗口,统计每隔两分钟的累计数(类似于中视频计划计算播放量完美累计曲线!)

package cn.itcast.day02.Window;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author lql* @time 2024-03-16 19:45:02* @description TODO*/
public class GroupWindowsSqlCumulateExample {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);String filePath = GroupWindowsSqlCumulateExample.class.getClassLoader().getResource("bid.csv").getPath();// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 ttEnv.executeSql("create table Bid(" +"bidtime TIMESTAMP(3)," +"price DECIMAL(10, 2), " +"item string," +"watermark for bidtime as bidtime - interval '1' second) " +"with("+ "'connector' = 'filesystem',"+ "'path' = 'file:///"+filePath+"',"+ "'format' = 'csv'"+ ")");tEnv.sqlQuery("SELECT window_start, window_end, SUM(price) as sum_price\n" +"  FROM TABLE(\n" +"    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))\n" +"  GROUP BY window_start, window_end").execute().print();}
}

结果:

+----+-------------------------+-------------------------+-----------+
| op |            window_start |              window_end | sum_price |
+----+-------------------------+-------------------------+-----------+
| +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:06:00.000 |      4.00 |
| +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:08:00.000 |      6.00 |
| +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:10:00.000 |     11.00 |
| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:12:00.000 |      3.00 |
| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:14:00.000 |      4.00 |
| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:16:00.000 |      4.00 |
| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:18:00.000 |     10.00 |
| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:20:00.000 |     10.00 |
+----+-------------------------+-------------------------+-----------+
8 rows in set

3. 多维数据分析

3.1 GROUPING SETS

当前效果:

SELECT window_start, window_end,userId,category,sum(price) as sum_price
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) 
GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId), ()) 

以前效果:

// ()
SELECT window_start, window_end, 'NULL' as userId, 'NULL' as category, sum(price) as sum_price
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end
UNION ALL
// (userId)
SELECT window_start, window_end, userId as userId, 'NULL' as category, sum(price) as sum_price
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, userId
UNION ALL
// (userId, category)
SELECT window_start, window_end,userId, category, sum(price) as sum_price
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, userId, category

3.2 ROLLUP

速记:从右往左,全面到稀缺!

GROUP BY ROLLUP(a, b, c)
--等价于以下语句。
GROUPING SETS((a,b,c),(a,b),(a), ())GROUP BY ROLLUP ( a, (b, c), d )
--等价于以下语句。
GROUPING SETS (( a, b, c, d ),( a, b, c    ),( a          ),(            )
)

3.3 CUBE

速记:排列组合

GROUP BY CUBE(a, b, c)
--等价于以下语句。
GROUPING SETS((a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),())GROUP BY CUBE ( (a, b), (c, d) )
--等价于以下语句。
GROUPING SETS (( a, b, c, d ),( a, b       ),(       c, d ),(            )
)// CUBE 和 GROUPING SETS 组合,相当于排列组合基础上加上元素
GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e))
--等价于以下语句。
GROUP BY GROUPING SETS ((a, b, c, d), (a, b, c, e),(a, b, d),    (a, b, e),(a, c, d),    (a, c, e),(a, d),       (a, e)
)

3.4 GROUPING 和 GROUPING_ID

背景:GROUPING SETS 结果中使用 NULL 充当占位符,导致无法区分占位符 NULL 与数据中真正的 NULL。

3.4.1 GROUPING 函数
  • 接受一个列名作为参数
  • 返回0,意味着 无NULL / 来自输入数据(原本存在的空值
  • 返回1,意味着 NULL 是 GROUPING SETS 的占位符。

实例:

SELECT  window_start, window_end, userId, category, GROUPING(category) as categoryFlag,sum(price) as sum_price,IF(GROUPING(category) = 0, category, 'ALL') as `all`
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) 
GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId))

结果:

window_startwindow_enduserIdcategorysum_priceflagall
2021-05-23 05:16:35.0002021-05-23 05:16:40.000NULLNULL10.11ALL
2021-05-23 05:16:40.0002021-05-23 05:16:45.000NULLNULL96.61ALL
2021-05-23 05:16:45.0002021-05-23 05:16:50.000NULLNULL15.61ALL
2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001电脑10.10电脑
2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001手机14.10手机
2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002手机82.50手机
2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001电脑15.60电脑
2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001NULL10.11ALL
2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001NULL14.11ALL
2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002NULL82.51ALL
2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001NULL15.61ALL

3.4.2 GROUPING_ID(兼容 Hive)

MaxCompute还提供了无参数的 GROUPING__ID 函数,用于兼容Hive查询。

结果是将参数列的GROUPING结果按照BitMap的方式组成整数

MaxCompute 和 Hive 2.3.0 及以上版本兼容该函数,在Hive 2.3.0以下版本中该函数输出不一致,因此并不推荐使用此函数

SELECT
a,b,c ,
COUNT(*),
GROUPING_ID
FROM VALUES (1,2,3) as t(a,b,c)
GROUP BY a, b, c GROUPING SETS ((a,b,c), (a));GROUPING_ID既无输入参数,也无括号。此表达方式在 MaxCompute 中等价于 GROUPING_ID(a,b,c),参数与 GROUP BY 的顺序一致。

3.5 Window Top-N

模板:计算每10分钟营业时间窗内销售额最高的前3名供应商。

SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM (SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cntFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, supplier_id)
) WHERE rownum <= 3;

思路:先算滚动时间 10 分钟,按照窗口时间,id 分组求和,再排序函数取前三。


4. Over Windows

4.1 ROWS OVER WINDOW

按照行进行划分:BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW

注解:如果不加 rowCount 相当于从以前到现在,加上 rowCount 相当于从前 n 行到现在!

数据源:

itemIDitemTypeonSellTimeprice
ITEM001Electronic2021-05-11 10:01:00.00020
ITEM002Electronic2021-05-11 10:02:00.00050
ITEM003Electronic2021-05-11 10:03:00.00030
ITEM004Electronic2021-05-11 10:03:00.00060
ITEM005Electronic2021-05-11 10:05:00.00040
ITEM006Electronic2021-05-11 10:06:00.00020
ITEM007Electronic2021-05-11 10:07:00.00070
ITEM008Clothes2021-05-11 10:08:00.00020
ITEM009Clothes2021-05-11 10:09:00.00040
ITEM010Clothes2021-05-11 10:11:00.00030

示例:按照 itemType 分组,onSellTime 升序,求从以前到现在总金额

selectitemID,itemType,onSellTime,price,sum(price) over w as sumPrice
from tmall_itemWINDOW w AS (PARTITION BY itemType ORDER BY onSellTime ROWS  BETWEEN UNBOUNDED preceding AND CURRENT ROW)

结果:

itemIDitemTypeonSellTimepricesumPrice
ITEM001Electronic2021-05-11 10:01:00.00020.020.0
ITEM002Electronic2021-05-11 10:02:00.00050.070.0
ITEM003Electronic2021-05-11 10:03:00.00030.0100.0
ITEM004Electronic2021-05-11 10:03:00.00060.0160.0
ITEM005Electronic2021-05-11 10:05:00.00040.0200.0
ITEM006Electronic2021-05-11 10:06:00.00020.0220.0
ITEM007Electronic2021-05-11 10:07:00.00070.0290.0
ITEM008Clothes2021-05-11 10:08:00.00020.020.0
ITEM009Clothes2021-05-11 10:09:00.00040.060.0
ITEM010Clothes2021-05-11 10:11:00.00030.090.0

4.2 RANGE OVER WINDOW

按照时间进行划分:ROWS BETWEEN ( UNBOUNDED | rowCount ) preceding AND CURRENT ROW

例子:实时统计两分钟内金额

selectitemID,itemType,onSellTime,price,sum(price) over w as sumPrice
from tmall_itemWINDOW w AS (PARTITION BY itemTypeORDER BY onSellTimeRANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW)

5. TableAPI 窗口的定义

5.1.1 滚动窗口

Tumble 类方法:

  • over:定义窗口长度
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的groupBy中

例子:每隔5秒钟统计一次每个商品类型的销售总额

public class GroupWindowsTableApiTumbleExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<OrderInfo> dataStream = env.fromElements(new OrderInfo("电脑", 1000L, 100D),new OrderInfo("手机", 2000L, 200D),new OrderInfo("电脑", 3000L, 300D),new OrderInfo("手机", 4000L, 400D),new OrderInfo("手机", 5000L, 500D),new OrderInfo("电脑", 6000L, 600D)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));table.window(Tumble.over(lit(5).second()).on($("timestamp")).as("w"))  // 定义滚动窗口并给窗口起一个别名.groupBy($("category"), $("w")) // 窗口必须出现的分组字段中.select($("category"), $("w").start().as("window_start"), $("w").end().as("window_end"), $("money").sum().as("total_money")).execute().print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo {private String category;private Long timestamp;private Double money;}
}

5.1.2 滑动窗口

Slide 类方法:

  • over:定义窗口长度
  • every:定义滑动步长
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的groupBy中

例子:每隔5秒钟统计过去10秒钟每个商品类型的销售总额

public class GroupWindowsTableApiTumbleExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<OrderInfo> dataStream = env.fromElements(new OrderInfo("电脑", 1000L, 100D),new OrderInfo("手机", 2000L, 200D),new OrderInfo("电脑", 3000L, 300D),new OrderInfo("手机", 4000L, 400D),new OrderInfo("手机", 5000L, 500D),new OrderInfo("电脑", 6000L, 600D)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));table.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("timestamp")).as("w"))  // 定义滚动窗口并给窗口起一个别名.groupBy($("category"), $("w")) // 窗口必须出现的分组字段中.select($("category"), $("w").start().as("window_start"), $("w").end().as("window_end"), $("money").sum().as("total_money")).execute().print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo {private String category;private Long timestamp;private Double money;}
}

5.1.3 会话窗口

Session 类方法:

  • withGap:会话时间间隔
  • on:用来分组(按时间间隔)或者排序(按行数)的时间字段
  • as:别名,必须出现在后面的groupBy中

例子:两次的时间间隔超过6秒的基础上,没有新的订单事件这个窗口就会关闭,然后处理这个窗口区间内所产生的订单数据计算

public class GroupWindowsTableApiTumbleExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<OrderInfo> dataStream = env.fromElements(new OrderInfo("电脑", 1000L, 100D),new OrderInfo("手机", 2000L, 200D),new OrderInfo("电脑", 3000L, 300D),new OrderInfo("手机", 4000L, 400D),new OrderInfo("手机", 5000L, 500D),new OrderInfo("电脑", 6000L, 600D)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Table table = tableEnv.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));table.window(Session.withGap(lit(6).second()).on($("timestamp")).as("w"))  // 定义滚动窗口并给窗口起一个别名.groupBy($("category"), $("w")) // 窗口必须出现的分组字段中.select($("category"), $("w").start().as("window_start"), $("w").end().as("window_end"), $("money").sum().as("total_money")).execute().print();env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class OrderInfo {private String category;private Long timestamp;private Double money;}
}

这篇关于flink重温笔记(十九): flinkSQL 顶层 API ——FlinkSQL 窗口(解决动态累积数据业务需求)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/819262

相关文章

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

MyBatis模糊查询报错:ParserException: not supported.pos 问题解决

《MyBatis模糊查询报错:ParserException:notsupported.pos问题解决》本文主要介绍了MyBatis模糊查询报错:ParserException:notsuppo... 目录问题描述问题根源错误SQL解析逻辑深层原因分析三种解决方案方案一:使用CONCAT函数(推荐)方案二:

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删

IntelliJ IDEA 中配置 Spring MVC 环境的详细步骤及问题解决

《IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决》:本文主要介绍IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决,本文分步骤结合实例给大... 目录步骤 1:创建 Maven Web 项目步骤 2:添加 Spring MVC 依赖1、保存后执行2、将新的依赖

Spring 中的循环引用问题解决方法

《Spring中的循环引用问题解决方法》:本文主要介绍Spring中的循环引用问题解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录什么是循环引用?循环依赖三级缓存解决循环依赖二级缓存三级缓存本章来聊聊Spring 中的循环引用问题该如何解决。这里聊

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

关于MongoDB图片URL存储异常问题以及解决

《关于MongoDB图片URL存储异常问题以及解决》:本文主要介绍关于MongoDB图片URL存储异常问题以及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录MongoDB图片URL存储异常问题项目场景问题描述原因分析解决方案预防措施js总结MongoDB图

SpringBoot项目中报错The field screenShot exceeds its maximum permitted size of 1048576 bytes.的问题及解决

《SpringBoot项目中报错ThefieldscreenShotexceedsitsmaximumpermittedsizeof1048576bytes.的问题及解决》这篇文章... 目录项目场景问题描述原因分析解决方案总结项目场景javascript提示:项目相关背景:项目场景:基于Spring

解决Maven项目idea找不到本地仓库jar包问题以及使用mvn install:install-file

《解决Maven项目idea找不到本地仓库jar包问题以及使用mvninstall:install-file》:本文主要介绍解决Maven项目idea找不到本地仓库jar包问题以及使用mvnin... 目录Maven项目idea找不到本地仓库jar包以及使用mvn install:install-file基

最详细安装 PostgreSQL方法及常见问题解决

《最详细安装PostgreSQL方法及常见问题解决》:本文主要介绍最详细安装PostgreSQL方法及常见问题解决,介绍了在Windows系统上安装PostgreSQL及Linux系统上安装Po... 目录一、在 Windows 系统上安装 PostgreSQL1. 下载 PostgreSQL 安装包2.