袋鼠云的FlinkSQL插件开发

2023-10-25 07:36
文章标签 开发 插件 flinksql 袋鼠

本文主要是介绍袋鼠云的FlinkSQL插件开发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

袋鼠云是什么

袋鼠云是一家大数据产品供应商。他开发了一个产品叫做 flinkStreamSQL。这东西是以 Flink 为基础开发的使用 SQL 来写流式计算逻辑的产品。

FlinkStreamSQL 的开源地址

什么是插件

这里所说的插件是可以理解为自定义的语法。例如下面的 SQL:

select fact.shop_id,shop.shop_name
from fact_stream as fact
left join dim_shop as shop
on fact.shop_id = shop.shopid

dim_shop 可能是一个 redis 为实体的 Table ,这袋鼠已经为我们实现了,现在我们可能从 HTTP 的接口拿到数据,此时的话,我们可以自定义一个 HTTP Table ,然后上面的代码不用修改。

整体的流程

编写、执行 FlinkStreamSQL 的流程如下所示:


CREATE TABLE source(colName colType,...function(colNameX) AS aliasName,WATERMARK FOR colName AS withOffset( colName , delayTime ))WITH(type ='kafka09',kafka.bootstrap.servers ='ip:port,ip:port...',kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',kafka.auto.offset.reset ='latest',kafka.topic ='topicName',parallelism ='parllNum',--timezone='America/Los_Angeles',timezone='Asia/Shanghai',sourcedatatype ='json' #可不设置);CREATE TABLE sink(colName colType,...function(colNameX) AS aliasName,WATERMARK FOR colName AS withOffset( colName , delayTime ))WITH(type ='kafka09',kafka.bootstrap.servers ='ip:port,ip:port...',kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',kafka.auto.offset.reset ='latest',kafka.topic ='topicName',parallelism ='parllNum',--timezone='America/Los_Angeles',timezone='Asia/Shanghai',sourcedatatype ='json' #可不设置);CREATE TABLE dim (columnFamily:columnName type as alias,...PRIMARY KEY(keyInfo),PERIOD FOR SYSTEM_TIME)WITH(type ='hbase',zookeeperQuorum ='ip:port',zookeeperParent ='/hbase',tableName ='tableNamae',cache ='LRU',cacheSize ='10000',cacheTTLMs ='60000',parallelism ='1',partitionedJoin='false');insert into sink(
...
)
select source.*,dim.*
from source 
left join dim 
where 

流程图
如上面画的,会将 DDL 变成 source、sink、side-put 的算子。简单的讲,执行的逻辑是
; 为分割符号,分割开 sql 语句,然后使用正则表达式识别 DDL 语句、DML 语句。
其中 DDL 语句中符合(?i)^PERIOD\s+FOR\s+SYSTEM_TIME$ 则认为是 side-input ,side-input 会别解析为异步I/O
算子。如果 DDL 语句中没有则解析为 source 算子,如果 DDL 表在 DML 中在 insert into 后面,则为 sink 表。

知道了这些之后,我们可以自己定义一种 DDL 语句,如下:

 CREATE TABLE dim (columnFamily:columnName type as alias,...PRIMARY KEY(keyInfo),PERIOD FOR SYSTEM_TIME)WITH(type ='http',url ='http://....',...);insert into sink(
...
)
select source.*,dim.*
from source 
left join dim 
where 

其他的都不变,我现在的认为是实现一些接口,让 FlinkStreamSql 能通过 SQL 找到对应算子的实现。

关键的接口

DDL 语句解析的相关接口

AbstractTableInfo|--AbstractSideTableInfo|--AbstractTargetTableInfo|--AbstractSourceTableInfoAbstractTableParser|--AbstractSideTableParser|--ClickhouseSideParser|--AbstractSourceParser

袋鼠平台是如何找到对应的 AbstractSideTableInfo 的呢?其实靠 class 的命名规则,例如,hbase side table
AbstractSideTableInfo 的实现类是 HbaseSideParser。with 中的属性 type = hbase ,然后 table 的 DDL 中有 side table 的关键配置,然后所以贫出来的 class 文件文字是 HbaseSideParser ,namespace 是 com.dtstack.flink.sql.{类型}.{type}.table, 所以全程也出来了。

转化为算子的接口

BaseAsyncReqRow 此接口是继承了 RichAsyncFunction ,重要的方法有 handleAsyncInvoke 里面实现了异步调用外表接口的东西。

我实现的是的 Http side table ,使用的是 http 异步的请求接口:

<dependency><groupId>org.asynchttpclient</groupId><artifactId>async-http-client</artifactId><version>2.12.3</version>
</dependency>

使用的接口是:

AsyncHttpClient client = Dsl.asyncHttpClient();
BoundRequestBuilder builder = client.preparePost(url)
.setHeader("Content-type","application/json")
.setBody(json.getBytes());Request r = builder.build();
ListenableFuture whenResponse = client.executeRequest(build);
whenResponse.addListener(new Runnable(){public void run(){// 编写异步的回调函数}}
)

这篇关于袋鼠云的FlinkSQL插件开发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/281002

相关文章

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

PyQt5 GUI 开发的基础知识

《PyQt5GUI开发的基础知识》Qt是一个跨平台的C++图形用户界面开发框架,支持GUI和非GUI程序开发,本文介绍了使用PyQt5进行界面开发的基础知识,包括创建简单窗口、常用控件、窗口属性设... 目录简介第一个PyQt程序最常用的三个功能模块控件QPushButton(按钮)控件QLable(纯文本

Spring Boot Maven 插件如何构建可执行 JAR 的核心配置

《SpringBootMaven插件如何构建可执行JAR的核心配置》SpringBoot核心Maven插件,用于生成可执行JAR/WAR,内置服务器简化部署,支持热部署、多环境配置及依赖管理... 目录前言一、插件的核心功能与目标1.1 插件的定位1.2 插件的 Goals(目标)1.3 插件定位1.4 核

基于Python开发一个图像水印批量添加工具

《基于Python开发一个图像水印批量添加工具》在当今数字化内容爆炸式增长的时代,图像版权保护已成为创作者和企业的核心需求,本方案将详细介绍一个基于PythonPIL库的工业级图像水印解决方案,有需要... 目录一、系统架构设计1.1 整体处理流程1.2 类结构设计(扩展版本)二、核心算法深入解析2.1 自

java使用protobuf-maven-plugin的插件编译proto文件详解

《java使用protobuf-maven-plugin的插件编译proto文件详解》:本文主要介绍java使用protobuf-maven-plugin的插件编译proto文件,具有很好的参考价... 目录protobuf文件作为数据传输和存储的协议主要介绍在Java使用maven编译proto文件的插件

浏览器插件cursor实现自动注册、续杯的详细过程

《浏览器插件cursor实现自动注册、续杯的详细过程》Cursor简易注册助手脚本通过自动化邮箱填写和验证码获取流程,大大简化了Cursor的注册过程,它不仅提高了注册效率,还通过友好的用户界面和详细... 目录前言功能概述使用方法安装脚本使用流程邮箱输入页面验证码页面实战演示技术实现核心功能实现1. 随机

SpringBoot开发中十大常见陷阱深度解析与避坑指南

《SpringBoot开发中十大常见陷阱深度解析与避坑指南》在SpringBoot的开发过程中,即使是经验丰富的开发者也难免会遇到各种棘手的问题,本文将针对SpringBoot开发中十大常见的“坑... 目录引言一、配置总出错?是不是同时用了.properties和.yml?二、换个位置配置就失效?搞清楚加

Python中对FFmpeg封装开发库FFmpy详解

《Python中对FFmpeg封装开发库FFmpy详解》:本文主要介绍Python中对FFmpeg封装开发库FFmpy,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、FFmpy简介与安装1.1 FFmpy概述1.2 安装方法二、FFmpy核心类与方法2.1 FF

基于Python开发Windows屏幕控制工具

《基于Python开发Windows屏幕控制工具》在数字化办公时代,屏幕管理已成为提升工作效率和保护眼睛健康的重要环节,本文将分享一个基于Python和PySide6开发的Windows屏幕控制工具,... 目录概述功能亮点界面展示实现步骤详解1. 环境准备2. 亮度控制模块3. 息屏功能实现4. 息屏时间

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部