4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)

本文主要是介绍4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.基本程序结构

2.创建表环境

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

3.2 连接到文件系统(Csv 格式)

3.3 连接到 Kafka

4.代码示例


1.基本程序结构

        Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。具体操作流程如下:

StreamTableEnvironment tableEnv = ... // 创建表的执行环境// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中
result.insertInto("outputTable");

2.创建表环境

        创建表环境最简单的方式,就是基于流处理执行环境,调 create 方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责:⚫ 注册 catalog在内部 catalog 中注册表执行 SQL 查询注册用户自定义函数UDF ⚫ 将 DataStream 或 DataSet 转换为表 ⚫ 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

        在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数, 可以用来配置 TableEnvironment 的一些特性。

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

        TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。

        表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

        表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。

3.2 连接到文件系统(Csv 格式)

        连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做 FileSystem()。代码如下:

tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())) // 定义表结构.createTemporaryTable("inputTable"); // 创建临时表

        这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被 弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version>
</dependency>

        代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。

3.3 连接到 Kafka

        kafka 的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。

tableEnv.connect(new Kafka().version("0.11") // 定义 kafka 的版本.topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");

        当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。

4.代码示例

package com.atguigu.apitest.tableapi;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class TableTest2_CommonApi {public static void main(String[] args) throws Exception{// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);// 2. 表的创建:连接外部系统,读取数据// 2.1 读取文件String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new Csv()).withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(aggTable, Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");env.execute();}
}

 

这篇关于4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754485

相关文章

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优