袋鼠云的FlinkSQL插件开发

2023-10-25 07:36
文章标签 开发 插件 flinksql 袋鼠

本文主要是介绍袋鼠云的FlinkSQL插件开发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

袋鼠云是什么

袋鼠云是一家大数据产品供应商。他开发了一个产品叫做 flinkStreamSQL。这东西是以 Flink 为基础开发的使用 SQL 来写流式计算逻辑的产品。

FlinkStreamSQL 的开源地址

什么是插件

这里所说的插件是可以理解为自定义的语法。例如下面的 SQL:

select fact.shop_id,shop.shop_name
from fact_stream as fact
left join dim_shop as shop
on fact.shop_id = shop.shopid

dim_shop 可能是一个 redis 为实体的 Table ,这袋鼠已经为我们实现了,现在我们可能从 HTTP 的接口拿到数据,此时的话,我们可以自定义一个 HTTP Table ,然后上面的代码不用修改。

整体的流程

编写、执行 FlinkStreamSQL 的流程如下所示:


CREATE TABLE source(colName colType,...function(colNameX) AS aliasName,WATERMARK FOR colName AS withOffset( colName , delayTime ))WITH(type ='kafka09',kafka.bootstrap.servers ='ip:port,ip:port...',kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',kafka.auto.offset.reset ='latest',kafka.topic ='topicName',parallelism ='parllNum',--timezone='America/Los_Angeles',timezone='Asia/Shanghai',sourcedatatype ='json' #可不设置);CREATE TABLE sink(colName colType,...function(colNameX) AS aliasName,WATERMARK FOR colName AS withOffset( colName , delayTime ))WITH(type ='kafka09',kafka.bootstrap.servers ='ip:port,ip:port...',kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',kafka.auto.offset.reset ='latest',kafka.topic ='topicName',parallelism ='parllNum',--timezone='America/Los_Angeles',timezone='Asia/Shanghai',sourcedatatype ='json' #可不设置);CREATE TABLE dim (columnFamily:columnName type as alias,...PRIMARY KEY(keyInfo),PERIOD FOR SYSTEM_TIME)WITH(type ='hbase',zookeeperQuorum ='ip:port',zookeeperParent ='/hbase',tableName ='tableNamae',cache ='LRU',cacheSize ='10000',cacheTTLMs ='60000',parallelism ='1',partitionedJoin='false');insert into sink(
...
)
select source.*,dim.*
from source 
left join dim 
where 

流程图
如上面画的,会将 DDL 变成 source、sink、side-put 的算子。简单的讲,执行的逻辑是
; 为分割符号,分割开 sql 语句,然后使用正则表达式识别 DDL 语句、DML 语句。
其中 DDL 语句中符合(?i)^PERIOD\s+FOR\s+SYSTEM_TIME$ 则认为是 side-input ,side-input 会别解析为异步I/O
算子。如果 DDL 语句中没有则解析为 source 算子,如果 DDL 表在 DML 中在 insert into 后面,则为 sink 表。

知道了这些之后,我们可以自己定义一种 DDL 语句,如下:

 CREATE TABLE dim (columnFamily:columnName type as alias,...PRIMARY KEY(keyInfo),PERIOD FOR SYSTEM_TIME)WITH(type ='http',url ='http://....',...);insert into sink(
...
)
select source.*,dim.*
from source 
left join dim 
where 

其他的都不变,我现在的认为是实现一些接口,让 FlinkStreamSql 能通过 SQL 找到对应算子的实现。

关键的接口

DDL 语句解析的相关接口

AbstractTableInfo|--AbstractSideTableInfo|--AbstractTargetTableInfo|--AbstractSourceTableInfoAbstractTableParser|--AbstractSideTableParser|--ClickhouseSideParser|--AbstractSourceParser

袋鼠平台是如何找到对应的 AbstractSideTableInfo 的呢?其实靠 class 的命名规则,例如,hbase side table
AbstractSideTableInfo 的实现类是 HbaseSideParser。with 中的属性 type = hbase ,然后 table 的 DDL 中有 side table 的关键配置,然后所以贫出来的 class 文件文字是 HbaseSideParser ,namespace 是 com.dtstack.flink.sql.{类型}.{type}.table, 所以全程也出来了。

转化为算子的接口

BaseAsyncReqRow 此接口是继承了 RichAsyncFunction ,重要的方法有 handleAsyncInvoke 里面实现了异步调用外表接口的东西。

我实现的是的 Http side table ,使用的是 http 异步的请求接口:

<dependency><groupId>org.asynchttpclient</groupId><artifactId>async-http-client</artifactId><version>2.12.3</version>
</dependency>

使用的接口是:

AsyncHttpClient client = Dsl.asyncHttpClient();
BoundRequestBuilder builder = client.preparePost(url)
.setHeader("Content-type","application/json")
.setBody(json.getBytes());Request r = builder.build();
ListenableFuture whenResponse = client.executeRequest(build);
whenResponse.addListener(new Runnable(){public void run(){// 编写异步的回调函数}}
)

这篇关于袋鼠云的FlinkSQL插件开发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/281002

相关文章

基于Python开发Windows屏幕控制工具

《基于Python开发Windows屏幕控制工具》在数字化办公时代,屏幕管理已成为提升工作效率和保护眼睛健康的重要环节,本文将分享一个基于Python和PySide6开发的Windows屏幕控制工具,... 目录概述功能亮点界面展示实现步骤详解1. 环境准备2. 亮度控制模块3. 息屏功能实现4. 息屏时间

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

使用Python开发一个现代化屏幕取色器

《使用Python开发一个现代化屏幕取色器》在UI设计、网页开发等场景中,颜色拾取是高频需求,:本文主要介绍如何使用Python开发一个现代化屏幕取色器,有需要的小伙伴可以参考一下... 目录一、项目概述二、核心功能解析2.1 实时颜色追踪2.2 智能颜色显示三、效果展示四、实现步骤详解4.1 环境配置4.

Python使用smtplib库开发一个邮件自动发送工具

《Python使用smtplib库开发一个邮件自动发送工具》在现代软件开发中,自动化邮件发送是一个非常实用的功能,无论是系统通知、营销邮件、还是日常工作报告,Python的smtplib库都能帮助我们... 目录代码实现与知识点解析1. 导入必要的库2. 配置邮件服务器参数3. 创建邮件发送类4. 实现邮件

CnPlugin是PL/SQL Developer工具插件使用教程

《CnPlugin是PL/SQLDeveloper工具插件使用教程》:本文主要介绍CnPlugin是PL/SQLDeveloper工具插件使用教程,具有很好的参考价值,希望对大家有所帮助,如有错... 目录PL/SQL Developer工具插件使用安装拷贝文件配置总结PL/SQL Developer工具插

基于Python开发一个有趣的工作时长计算器

《基于Python开发一个有趣的工作时长计算器》随着远程办公和弹性工作制的兴起,个人及团队对于工作时长的准确统计需求日益增长,本文将使用Python和PyQt5打造一个工作时长计算器,感兴趣的小伙伴可... 目录概述功能介绍界面展示php软件使用步骤说明代码详解1.窗口初始化与布局2.工作时长计算核心逻辑3

maven中的maven-antrun-plugin插件示例详解

《maven中的maven-antrun-plugin插件示例详解》maven-antrun-plugin是Maven生态中一个强大的工具,尤其适合需要复用Ant脚本或实现复杂构建逻辑的场景... 目录1. 核心功能2. 典型使用场景3. 配置示例4. 关键配置项5. 优缺点分析6. 最佳实践7. 常见问题

python web 开发之Flask中间件与请求处理钩子的最佳实践

《pythonweb开发之Flask中间件与请求处理钩子的最佳实践》Flask作为轻量级Web框架,提供了灵活的请求处理机制,中间件和请求钩子允许开发者在请求处理的不同阶段插入自定义逻辑,实现诸如... 目录Flask中间件与请求处理钩子完全指南1. 引言2. 请求处理生命周期概述3. 请求钩子详解3.1

如何基于Python开发一个微信自动化工具

《如何基于Python开发一个微信自动化工具》在当今数字化办公场景中,自动化工具已成为提升工作效率的利器,本文将深入剖析一个基于Python的微信自动化工具开发全过程,有需要的小伙伴可以了解下... 目录概述功能全景1. 核心功能模块2. 特色功能效果展示1. 主界面概览2. 定时任务配置3. 操作日志演示

JavaScript实战:智能密码生成器开发指南

本文通过JavaScript实战开发智能密码生成器,详解如何运用crypto.getRandomValues实现加密级随机密码生成,包含多字符组合、安全强度可视化、易混淆字符排除等企业级功能。学习密码强度检测算法与信息熵计算原理,获取可直接嵌入项目的完整代码,提升Web应用的安全开发能力 目录