详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换

2024-06-11 19:36

本文主要是介绍详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、表转换为 DataStream

/**Table API 中表到 DataStream 有两种模式:1.追加模式(Append Mode):用于表只会被插入(Insert)操作更改的场景。2.撤回模式(Retract Mode):用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据,Delete)
*/
public class TableTransformDataStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//注册表tableEnv.connect(new FileSystem().path("./sensor.txt")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("sensor");//获取表Table sensorTable = tableEnv.from("sensor");//简单查询Table resultTable = sensorTable.select("id, temperature").where("id = 'sensor_1'");//聚合查询统计Table aggTable = sensorTable.groupBy("id").select("id, id.count as cnt, temperature.avg as avgTemp");DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);DataStream<Tuple2<Boolean, Row>> aggStream = tableEnv.toRetractStream(aggTable, Row.class);resultStream.print("result");aggStream.print("agg");env.execute();}
}

二、DataStream 转换为表

public class DataStreamTransformTable {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//使用 fromDataStream 将 DataStream 转换为表Table table = tableEnv.fromDataStream(dataStream);//Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");env.execute();}
}

三、查看执行计划

/**Table API 提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或 TableEnvironment.explain() 方法完成的。explain 方法会返回一个字符串,描述三个计划:1.未优化的逻辑查询计划2.优化后的逻辑查询计划3.实际执行计划
*/
String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

这篇关于详解 Flink Table API 和 Flink SQL 之表和 DataStream 的转换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1052090

相关文章

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

MyBatis-Plus 中 nested() 与 and() 方法详解(最佳实践场景)

《MyBatis-Plus中nested()与and()方法详解(最佳实践场景)》在MyBatis-Plus的条件构造器中,nested()和and()都是用于构建复杂查询条件的关键方法,但... 目录MyBATis-Plus 中nested()与and()方法详解一、核心区别对比二、方法详解1.and()

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客