4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)

本文主要是介绍4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1.基本程序结构

2.创建表环境

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

3.2 连接到文件系统(Csv 格式)

3.3 连接到 Kafka

4.代码示例


1.基本程序结构

        Table API 和 SQL 的程序结构,与流式处理的程序结构类似;也可以近似地认为有这么 几步:首先创建执行环境,然后定义 source、transform 和 sink。具体操作流程如下:

StreamTableEnvironment tableEnv = ... // 创建表的执行环境// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中
result.insertInto("outputTable");

2.创建表环境

        创建表环境最简单的方式,就是基于流处理执行环境,调 create 方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责:⚫ 注册 catalog在内部 catalog 中注册表执行 SQL 查询注册用户自定义函数UDF ⚫ 将 DataStream 或 DataSet 转换为表 ⚫ 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

        在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数, 可以用来配置 TableEnvironment 的一些特性。

2.1 配置老版本 planner

2.2 配置 blink planner

3.在 Catalog 中注册表

3.1 表的概念

        TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的 map。

        表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

        表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。

3.2 连接到文件系统(Csv 格式)

        连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做 FileSystem()。代码如下:

tableEnv.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法 .withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())) // 定义表结构.createTemporaryTable("inputTable"); // 创建临时表

        这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被 弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version>
</dependency>

        代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。

3.3 连接到 Kafka

        kafka 的连接器 flink-kafka-connector 中,1.10 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。

tableEnv.connect(new Kafka().version("0.11") // 定义 kafka 的版本.topic("sensor") // 定义主题 .property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");

        当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。

4.代码示例

package com.atguigu.apitest.tableapi;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class TableTest2_CommonApi {public static void main(String[] args) throws Exception{// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);// 2. 表的创建:连接外部系统,读取数据// 2.1 读取文件String filePath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new Csv()).withSchema( new Schema().field("id", DataTypes.STRING()).field("timestamp", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(aggTable, Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");env.execute();}
}

 

这篇关于4.2.2 Flink-流处理框架-Table API 与 SQL-基本程序结构(创建表环境+在 Catalog 中注册表)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754485

相关文章

Mac电脑如何通过 IntelliJ IDEA 远程连接 MySQL

《Mac电脑如何通过IntelliJIDEA远程连接MySQL》本文详解Mac通过IntelliJIDEA远程连接MySQL的步骤,本文通过图文并茂的形式给大家介绍的非常详细,感兴趣的朋友跟... 目录MAC电脑通过 IntelliJ IDEA 远程连接 mysql 的详细教程一、前缀条件确认二、打开 ID

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键

Java利用@SneakyThrows注解提升异常处理效率详解

《Java利用@SneakyThrows注解提升异常处理效率详解》这篇文章将深度剖析@SneakyThrows的原理,用法,适用场景以及隐藏的陷阱,看看它如何让Java异常处理效率飙升50%,感兴趣的... 目录前言一、检查型异常的“诅咒”:为什么Java开发者讨厌它1.1 检查型异常的痛点1.2 为什么说

MySQL的配置文件详解及实例代码

《MySQL的配置文件详解及实例代码》MySQL的配置文件是服务器运行的重要组成部分,用于设置服务器操作的各种参数,下面:本文主要介绍MySQL配置文件的相关资料,文中通过代码介绍的非常详细,需要... 目录前言一、配置文件结构1.[mysqld]2.[client]3.[mysql]4.[mysqldum

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐

Go语言连接MySQL数据库执行基本的增删改查

《Go语言连接MySQL数据库执行基本的增删改查》在后端开发中,MySQL是最常用的关系型数据库之一,本文主要为大家详细介绍了如何使用Go连接MySQL数据库并执行基本的增删改查吧... 目录Go语言连接mysql数据库准备工作安装 MySQL 驱动代码实现运行结果注意事项Go语言执行基本的增删改查准备工作

MySQL按时间维度对亿级数据表进行平滑分表

《MySQL按时间维度对亿级数据表进行平滑分表》本文将以一个真实的4亿数据表分表案例为基础,详细介绍如何在不影响线上业务的情况下,完成按时间维度分表的完整过程,感兴趣的小伙伴可以了解一下... 目录引言一、为什么我们需要分表1.1 单表数据量过大的问题1.2 分表方案选型二、分表前的准备工作2.1 数据评估

SQL Server 查询数据库及数据文件大小的方法

《SQLServer查询数据库及数据文件大小的方法》文章介绍了查询数据库大小的SQL方法及存储过程实现,涵盖当前数据库、所有数据库的总大小及文件明细,本文结合实例代码给大家介绍的非常详细,感兴趣的... 目录1. 直接使用SQL1.1 查询当前数据库大小1.2 查询所有数据库的大小1.3 查询每个数据库的详

MySQL中REPLACE函数与语句举例详解

《MySQL中REPLACE函数与语句举例详解》在MySQL中REPLACE函数是一个用于处理字符串的强大工具,它的主要功能是替换字符串中的某些子字符串,:本文主要介绍MySQL中REPLACE函... 目录一、REPLACE()函数语法:参数说明:功能说明:示例:二、REPLACE INTO语句语法:参数