尝鲜!Flink1.12.2+Hudi0.9.0集成开发

2024-09-06 19:18

本文主要是介绍尝鲜!Flink1.12.2+Hudi0.9.0集成开发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

Hudi社区最近发生了一些有趣的变化,Hudi集成Flink的方案也已经发布,我个人在官网根据文档试验了一把,整体感觉还不错。我们目前并没有在生产环境中使用,但是随着社区发展和功能越来越完善,相信会有更多的业务开始尝试使用Hudi。本文在此做一个Flink和Hudi集成的分享,作者明喆sama。

一、组件下载

1.1、Flink1.12.2编译包下载:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz

1.2、Hudi编译:https://github.com/apache/hudi

git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests
注意:默认是用scala-2.11编译的
如果我们用的是flink1.12.2-2.12版本,可以自己编译成scala-2.12版本的
mvn clean package -DskipTests -Dscala-2.12
包的路径在packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.12-*.*.*-SNAPSHOT.jar

1.3、其他实时步骤可以参考官网的步骤了:https://hudi.apache.org/docs/flink-quick-start-guide.html

但是官网有个坑就是使用的是flink-1.11.x版本,但是我自己测试了是不行的,会报下面的错误:

1.4、后面从flink社区同学建议用flink1.12.2+hudi0.9.0(master),亲测可以。

二、Batch模式具体实施步骤:

2.1、 启动flink-sql客户端,可以提前把hudi-flink-bundle_2.12-0.9.0-SNAPSHOT.jar(我用的flink是scala2.12版本,如果是scala2.11版本需要编译成hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar)拷贝到 $FLINK_HOME/lib目录下

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded

2.2、 创建表结构

CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://localhost:9000/hudi/t1','table.type' = 'MERGE_ON_READ'
);

2.3、 插入数据

INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

2.4、 效果图

2.5、 查询表数据,设置一下查询模式为tableau

-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;

2.6、 根据主键更新数据

INSERT INTO t1 VALUES ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1');

id1的数据age由23变为了24

三、支持stream读模式:

3.1、 创建表

CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi','path' = 'hdfs://localhost:9000/hudi/t1','table.type' = 'MERGE_ON_READ','read.streaming.enabled' = 'true',  'read.streaming.start-commit' = '20210401134557' ,'read.streaming.check-interval' = '4'
);这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读。

3.2、 查询流模式的表t1,这里的数据就是刚刚批模式写入的数据

3.3、 从批模式写入一条数据

insert into t1 values ('id9','test',27,TIMESTAMP '1970-01-01 00:00:01','par5');

3.4、 隔几秒后在流模式可以读取到一条新增的数据

参考

1、https://hudi.apache.org/docs/flink-quick-start-guide.html

2、https://github.com/MyLanPangzi/flink-demo/blob/main/docs/%E5%A2%9E%E9%87%8F%E5%9E%8B%E6%95%B0%E4%BB%93%E6%8E%A2%E7%B4%A2%EF%BC%9AFlink%20+%20Hudi.md

实操 | Flink1.12.1通过Table API / Flink SQL读取HBase2.4.0

数据湖架构、战略和分析的8大错误认知

一致性哈希及其在Greenplum中的应用

一万五千字详解HTTP协议

这篇关于尝鲜!Flink1.12.2+Hudi0.9.0集成开发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142881

相关文章

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

Apache Ignite 与 Spring Boot 集成详细指南

《ApacheIgnite与SpringBoot集成详细指南》ApacheIgnite官方指南详解如何通过SpringBootStarter扩展实现自动配置,支持厚/轻客户端模式,简化Ign... 目录 一、背景:为什么需要这个集成? 二、两种集成方式(对应两种客户端模型) 三、方式一:自动配置 Thick

PyQt5 GUI 开发的基础知识

《PyQt5GUI开发的基础知识》Qt是一个跨平台的C++图形用户界面开发框架,支持GUI和非GUI程序开发,本文介绍了使用PyQt5进行界面开发的基础知识,包括创建简单窗口、常用控件、窗口属性设... 目录简介第一个PyQt程序最常用的三个功能模块控件QPushButton(按钮)控件QLable(纯文本

OpenCV在Java中的完整集成指南分享

《OpenCV在Java中的完整集成指南分享》本文详解了在Java中集成OpenCV的方法,涵盖jar包导入、dll配置、JNI路径设置及跨平台兼容性处理,提供了图像处理、特征检测、实时视频分析等应用... 目录1. OpenCV简介与应用领域1.1 OpenCV的诞生与发展1.2 OpenCV的应用领域2

SpringBoot集成MyBatis实现SQL拦截器的实战指南

《SpringBoot集成MyBatis实现SQL拦截器的实战指南》这篇文章主要为大家详细介绍了SpringBoot集成MyBatis实现SQL拦截器的相关知识,文中的示例代码讲解详细,有需要的小伙伴... 目录一、为什么需要SQL拦截器?二、MyBATis拦截器基础2.1 核心接口:Interceptor

SpringBoot集成EasyPoi实现Excel模板导出成PDF文件

《SpringBoot集成EasyPoi实现Excel模板导出成PDF文件》在日常工作中,我们经常需要将数据导出成Excel表格或PDF文件,本文将介绍如何在SpringBoot项目中集成EasyPo... 目录前言摘要简介源代码解析应用场景案例优缺点分析类代码方法介绍测试用例小结前言在日常工作中,我们经

基于Python开发一个图像水印批量添加工具

《基于Python开发一个图像水印批量添加工具》在当今数字化内容爆炸式增长的时代,图像版权保护已成为创作者和企业的核心需求,本方案将详细介绍一个基于PythonPIL库的工业级图像水印解决方案,有需要... 目录一、系统架构设计1.1 整体处理流程1.2 类结构设计(扩展版本)二、核心算法深入解析2.1 自

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

如何在Spring Boot项目中集成MQTT协议

《如何在SpringBoot项目中集成MQTT协议》本文介绍在SpringBoot中集成MQTT的步骤,包括安装Broker、添加EclipsePaho依赖、配置连接参数、实现消息发布订阅、测试接口... 目录1. 准备工作2. 引入依赖3. 配置MQTT连接4. 创建MQTT配置类5. 实现消息发布与订阅