大数据Flink(一百一十二):Flink SQL作业快速入门

2024-09-04 06:28

本文主要是介绍大数据Flink(一百一十二):Flink SQL作业快速入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

Flink SQL作业快速入门

一、进入Flink开发平台

二、​​​​​​​创建作业

三、​​​​​​​​​​​​​​编写作业代码

四、​​​​​​​​​​​​​​进行更多配置

五、​​​​​​​​​​​​​​进行深度检查

六、​​​​​​​​​​​​​​进行作业调试

1、​​​​​​​创建Session集群

2、​​​​​​​​​​​​​​调试

七、​​​​​​​​​​​​​​作业部署

八、​​​​​​​​​​​​​​启动并查看Flink计算结果

九、​​​​​​​​​​​​​​停止作业


Flink SQL作业快速入门

一、进入Flink开发平台

在阿里云官网首页,点击右上角控制台,进入工作台。

直接点击我的资源下的Flink,或者搜索Flink,进入Flink控制台。

点击实例id,进入Flink项目空间。 

二、​​​​​​​​​​​​​​创建作业

在左侧导航栏,单击SQL开发。

在作业草稿下,新建文件夹:阿里云Flink。

在此文件夹下,创建文件夹:快速入门。

在快速入门文件夹下,单击新建作业草稿。 

单击空白的流作业草稿。

单击下一步。

在新建文件草稿对话框,填写作业信息。 

作业参数

说明

示例

文件名称

作业的名称。

说明 作业名称在当前项目中必须保持唯一。

flink-sql-test

存储位置

指定该作业的代码文件所属的文件夹。

还可以在现有文件夹右侧,单击

图标,新建子文件夹。

快速入门

引擎版本

当前作业使用的Flink的引擎版本。

vvr-6.0.7-flink-1.15

单击创建。

三、​​​​​​​​​​​​​​编写作业代码

拷贝如下代码到SQL编辑器上。

--创建一个datagen_source临时表。
CREATE TEMPORARY TABLE datagen_source(randstr VARCHAR
) WITH ('connector' = 'datagen'
);--创建一个print_table临时表。
CREATE TEMPORARY TABLE print_table(randstr  VARCHAR
) WITH ('connector' = 'print','logger' = 'true'
);--将randstr字段的数据打印出来。
INSERT INTO print_table
SELECT SUBSTRING(randstr,0,8) from datagen_source;

说明: 在生产作业中,建议尽量减少临时表的使用,直接使用元数据管理中已经注册的表。 

 

四、​​​​​​​​​​​​​​进行更多配置

在作业开发页面右侧,单击更多配置后,我们可以填写以下参数信息:

  • 引擎版本:建议使用推荐版本或稳定版本,引擎版本标记含义详情如下:
    • 推荐版本(Recommend):当前最新大版本下的最新小版本。
    • 稳定版本(Stable):还在产品服务期内的大版本下最新的小版本,已修复历史版本缺陷。
    • 普通版本(Normal):还在产品服务期内的其他小版本。
    • EOS版本(Eos):超过产品服务期限的版本。
  • 附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。

 

五、​​​​​​​​​​​​​​进行深度检查

在作业开发页面顶部,单击深度检查,进行语法检查。

六、​​​​​​​​​​​​​​进行作业调试

1、​​​​​​​创建Session集群

调试之前,首先需要创建Session集群并且启动:

在左侧导航栏,单击Session管理。

点击创建Session集群。

按照下图进行配置:

 

点击创建Session集群,等待一会,可以看到集群状态变为运行中。 

2、​​​​​​​​​​​​​​调试

在作业开发页面顶部,单击调试。选择刚才创建的集群,点击下一步。

可以看到调试结果。

 

我们可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。 

七、​​​​​​​​​​​​​​作业部署

在作业开发页面顶部,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确定。

说明:Session集群适用于非生产环境的开发测试环境,可以使用Session集群模式部署或调试作业,提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐将作业提交至Session集群中,因为会存在业务稳定性问题。 

八、​​​​​​​​​​​​​​启动并查看Flink计算结果

在左侧导航栏,单击作业运维

单击目标作业名称操作列中的启动。

选择无状态启动后,单击启动。当您看到作业状态变为运行中,则代表作业运行正常。

在作业运维详情页面,查看Flink计算结果。

在作业运维页面,单击目标作业名称。

在作业探查页签,在下拉列表中选择运行日志。

单击运行Task Managers页签下的Path,ID。

 单击日志,单击下方页面,ctrl+f,在页面搜索PrintSinkOutputWriter相关的日志信息。

可以看到randstr字段的数据已经打印出来。

点击Stdout,向下拉取页面,同样也能看到结果

 

九、​​​​​​​​​​​​​​停止作业

在作业运维页面单击对应作业右侧的停止,即可停止作业。

如果我们对作业进行了修改(例如更改SQL代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要先上线,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,也需要停止后再启动作业。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

这篇关于大数据Flink(一百一十二):Flink SQL作业快速入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1135253

相关文章

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

从入门到精通详解Python虚拟环境完全指南

《从入门到精通详解Python虚拟环境完全指南》Python虚拟环境是一个独立的Python运行环境,它允许你为不同的项目创建隔离的Python环境,下面小编就来和大家详细介绍一下吧... 目录什么是python虚拟环境一、使用venv创建和管理虚拟环境1.1 创建虚拟环境1.2 激活虚拟环境1.3 验证虚

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

MySQL分库分表的实践示例

《MySQL分库分表的实践示例》MySQL分库分表适用于数据量大或并发压力高的场景,核心技术包括水平/垂直分片和分库,需应对分布式事务、跨库查询等挑战,通过中间件和解决方案实现,最佳实践为合理策略、备... 目录一、分库分表的触发条件1.1 数据量阈值1.2 并发压力二、分库分表的核心技术模块2.1 水平分

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Python与MySQL实现数据库实时同步的详细步骤

《Python与MySQL实现数据库实时同步的详细步骤》在日常开发中,数据同步是一项常见的需求,本篇文章将使用Python和MySQL来实现数据库实时同步,我们将围绕数据变更捕获、数据处理和数据写入这... 目录前言摘要概述:数据同步方案1. 基本思路2. mysql Binlog 简介实现步骤与代码示例1

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

使用shardingsphere实现mysql数据库分片方式

《使用shardingsphere实现mysql数据库分片方式》本文介绍如何使用ShardingSphere-JDBC在SpringBoot中实现MySQL水平分库,涵盖分片策略、路由算法及零侵入配置... 目录一、ShardingSphere 简介1.1 对比1.2 核心概念1.3 Sharding-Sp

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装