Flink流批一体计算(21):Flink SQL之Flink DDL

2023-11-29 10:30

本文主要是介绍Flink流批一体计算(21):Flink SQL之Flink DDL,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

执行 CREATE 语句

Python脚本

Java代码

SQL语句

列定义

物理/常规列

元数据列

计算列

WATERMARK

PRIMARY KEY

PARTITIONED BY

AS select_statement


Flink SQL是为了简化计算模型、降低您使用Flink门槛而设计的一套符合标准SQL语义的开发语言。

执行 CREATE 语句

Python脚本
table_env = TableEnvironment.create(...)# 对已经注册的表进行 SQL 查询# 注册名为 “Orders” 的表table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");# 在表上执行 SQL 查询,并把得到的结果作为一个新的表result = table_env.sql_query("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");# 对已注册的表进行 INSERT 操作# 注册 TableSinktable_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")# 在表上执行 INSERT 语句并向 TableSink 发出结果table_env \.execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")

Java代码
Environmentsettings settings = Environmentsettings.newInstance()...TableEnvironment tableEnv = TableEnvironment.create(settings);// 对已注册的表进行 SQL 查询// 注册名为 “Orders” 的表tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");// 在表上执行 SQL 查询,并把得到的结果作为一个新的表Table result = tableEnv.sqlQuery("SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");// 对已注册的表进行 INSERT 操作// 注册 TableSinktableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");// 在表上执行 INSERT 语句并向 TableSink 发出结果tableEnv.executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");

SQL语句
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);[INFO] Table has been created.Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);[INFO] Table has been created.Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';[INFO] Submitting SQL update statement to the cluster...

列定义

物理/常规列

物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效数据。其他类型的列可以在物理列之间声明,但不会影响最终的物理模式。

以下语句创建一个仅包含常规列的表:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING

) WITH (

  ...

);

元数据列

元数据列是SQL标准的扩展,元数据列由metadata关键字指示。例如,可以使用元数据列从Kafka记录读取时间戳,并将时间戳写入Kafka,以进行基于时间的操作。连接器和格式文档列出了每个组件的可用元数据字段。但是,在表的模式中声明元数据列是可选的。

以下语句创建一个表,其中包含引用元数据字段时间戳的附加元数据列:

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'    -- reads and writes a Kafka record's timestamp

) WITH (

  'connector' = 'kafka'

  ...

);

每个元数据字段都由基于字符串的键标识,并具有文档化的数据类型。例如,Kafka连接器公开了一个元数据字段,该字段具有键时间戳和数据类型timestamp_LTZ3),可用于读取和写入记录。

在上面的示例中,元数据列record_time成为表模式的一部分,可以像常规列一样进行转换和存储,为方便起见,如果列名应用作标识元数据键,则可以省略FROM子句。

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `name` STRING,

  `timestamp` TIMESTAMP_LTZ(3) METADATA    -- use column name as metadata key

) WITH (

  'connector' = 'kafka'

  ...

);

计算列

计算列是使用语法column_name AS Computed_column_expression生成的虚拟列。

计算列计算可以引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不物理存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。

例如,计算列可以定义为

CREATE TABLE MyTable (

  `user_id` BIGINT,

  `price` DOUBLE,

  `quantity` DOUBLE,

  `cost` AS price * quanitity,  -- evaluate expression and supply the result to queries

) WITH (

  'connector' = 'kafka'

  ...

);

Flink中通常使用计算列来定义CREATETABLE语句中的时间属性。

使用系统的PROCTIME()函数,可以通过proc AS PROCIME()轻松定义processing time属性。

Event time属性时间戳可以在WATERMARK声明之前预处理。例如,如果原始字段不是TIMESTAMP3)类型或嵌套在JSON字符串中,则可以使用计算列。

WATERMARK

WATERMARK 定义了表的Event time属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记event time的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。

使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。

Flink 提供了几种常用的 watermark 策略。

  • 严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。

  • 递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

发出到目前为止已观察到的最大时间戳减 1 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。

  • 有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND 是一个 5 秒延迟的 watermark 策略。

CREATE TABLE Orders (

    `user` BIGINT,

    product STRING,

    order_time TIMESTAMP(3),

    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

) WITH (......);

PRIMARY KEY

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。

Notes:  CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

PARTITIONED BY

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

AS select_statement

表也可以通过一个 CTAS 语句中的查询结果来创建并填充数据,CTAS 是一种简单、快捷的创建表并插入数据的方法。

CTAS 有两个部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查询 CREATE 部分从 SELECT 查询中获取列信息,并创建目标表。  CREATE TABLE 类似,CTAS 要求必须在目标表的 WITH 子句中指定必填的表属性。

CTAS 的建表操作需要依赖目标 Catalog。比如,Hive Catalog 会自动在 Hive 中创建物理表。但是基于内存的 Catalog 只会将表的元信息注册在执行 SQL Client 的内存中。

CREATE TABLE my_ctas_table

WITH (

    'connector' = 'kafka',

    ...

)

AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

注意 CTAS 有如下约束:

  • 暂不支持创建临时表。
  • 暂不支持指定列信息。
  • 暂不支持指定 Watermark。
  • 暂不支持创建分区表。
  • 暂不支持主键约束。

注意 目前,CTAS 创建的目标表是非原子性的,如果在向表中插入数据时发生错误,该表不会被自动删除。

这篇关于Flink流批一体计算(21):Flink SQL之Flink DDL的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/432479

相关文章

SQL server数据库如何下载和安装

《SQLserver数据库如何下载和安装》本文指导如何下载安装SQLServer2022评估版及SSMS工具,涵盖安装配置、连接字符串设置、C#连接数据库方法和安全注意事项,如混合验证、参数化查... 目录第一步:打开官网下载对应文件第二步:程序安装配置第三部:安装工具SQL Server Manageme

C#连接SQL server数据库命令的基本步骤

《C#连接SQLserver数据库命令的基本步骤》文章讲解了连接SQLServer数据库的步骤,包括引入命名空间、构建连接字符串、使用SqlConnection和SqlCommand执行SQL操作,... 目录建议配合使用:如何下载和安装SQL server数据库-CSDN博客1. 引入必要的命名空间2.

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

MySQL 多列 IN 查询之语法、性能与实战技巧(最新整理)

《MySQL多列IN查询之语法、性能与实战技巧(最新整理)》本文详解MySQL多列IN查询,对比传统OR写法,强调其简洁高效,适合批量匹配复合键,通过联合索引、分批次优化提升性能,兼容多种数据库... 目录一、基础语法:多列 IN 的两种写法1. 直接值列表2. 子查询二、对比传统 OR 的写法三、性能分析

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

浅谈mysql的not exists走不走索引

《浅谈mysql的notexists走不走索引》在MySQL中,​NOTEXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引,下面就来介绍一下mysql的notexists走不走索... 在mysql中,​NOT EXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引。以下

Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式

《Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式》本文详细介绍如何使用Java通过JDBC连接MySQL数据库,包括下载驱动、配置Eclipse环境、检测数据库连接等关键步骤,... 目录一、下载驱动包二、放jar包三、检测数据库连接JavaJava 如何使用 JDBC 连接 mys

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字