如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步

2024-05-14 06:36

本文主要是介绍如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

AWS Managed Apache Flink (以下以 MAF 代指)是 AWS 提供的一款 Serverless 的 Flink 服务。

1. 问题

大家在使用 MAF 的时候,可能遇到最大的一个问题就是 MAF 的依赖管理,很多时候在 Flink 上运行的代码,托管到 MAF 上之后发现有很多依赖问题需要解决,大体上感觉就是 MAF 一定需要一个纯洁的环境,纯洁的 Flink 代码包。
而我们在使用 MAF 向 Iceberg 表写入数据时候更是如此。在使用 MAF 向 Iceberg 写入数据时,使用 Glue Data Catalog,会遇到如下报错:

Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:211)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:406)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1356)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1111)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:701)

分析上面的错误,发现是在执行 Craete catalog 的时候,调用了 clusterHadoopConf 方法。我们在继续分析源码,在Iceberg 的源码 FlinkCatalogFactory 中,找到报错的代码位置,如下:

public static Configuration clusterHadoopConf() {return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}

而 HadoopUtils 这个类是来自于 org.apache.flink.runtime.util.HadoopUtils,我怀疑可能是 MAF 的环境是依赖于 EKS,因此镜像中并没有包含和 hadoop 相关的依赖,导致这里方法加载默认配置的时候,找不到 org/apache/hadoop/conf/Configuration 类,但是当我尝试在 maven 中加入 hadoop-client 依赖后,仍然存在这个问题。

2. 解决方案

通过上面的分析,我们知道了问题是出在了 org.apache.flink.runtime.util.HadoopUtils这个类,查找了很多资料,终于在 github 的 issue 中发现也有人遇到过这样的问题【#3044】,并且给出了一个绕行的方法,就是在自己的代码工程中重写 org.apache.flink.runtime.util.HadoopUtils这个类,不得不承认这是一个高明的方法。

重写HadoopUtils
在我们的代码工程中创建一个 package,并且添加一个名为 HadoopUtils 的 class,填入如下代码:

package org.apache.flink.runtime.util;import org.apache.hadoop.conf.Configuration;public class HadoopUtils {public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {return new Configuration(false);}
}

然后重新打包代码。
也可以参考 github 上的代码,链接🔗 github code
然后我们就可以编译打包代码。

3. Demo

下面我们通过一个完整的 Demo 来了解如何在 MAF 上实现 Iceberg 表的实时摄入。Demo 中会使用一个数据生成工具 Datafaker ,生成数据并且写入 MSK(kafka)中。

3.1 编译代码

获取 Demo代码,直接编译打包。

3.2 创建 MAF Application

  1. 将打包的 jar 上传至S3
  2. 进入 MAF 控制台,创建 Application,版本选择 Flink 1.18。
  3. 在 Application code location 部份填写在第1步上传的 jar 位置。
  4. MAF 会自动创建一个 IAM Role,在完成 Application 创建之后,请记得给这个 IAM Role 添加 Glue 读和写 Data Catalog 的权限,因为 Demo 代码工程会使用 Glue data catalog 作为 Iceberg catalog。
  5. 创建完 Application 就可以直接点击 Run 运行了。

3.3 生成数据

export MYBROKERS=<kafka-server>
export KAFKA_HOME=/home/ec2-user/environment/kafka_2.12-2.8.1
export TOPIC=datafaker_user_order_list_01
export IMPORT_ROWS=100000
#写入一条记录的间隔时间,也可以不设置
export INTERVAL=0.01
datafaker kafka $MYBROKERS $TOPIC $IMPORT_ROWS --meta dataformat_01.txt --interval $INTERVAL

这里就不详细介绍 datafaker 的使用了,如果想了解 datafaker 的参数配置可以从这个 github datafaker 获取。

3.4 在 Athena 中查询数据写入的结果

注意,如果 Athena 开启了 Reuse query results,可能会导致 count(*) 查询的不是最新的结果。
在这里插入图片描述

  1. 运维监控
    4.1 Metrics
    由于写入 Iceberg 表,不会在 Flink UI 看到 Records Recevied 以及 Records Send 等指标,因此如果想查看 Iceberg Sink 写入的数据量,需要进入Flink UI Sink 算子中,查看 Metrics 的 committedDataFilesRecordCount 指标。
    在这里插入图片描述

这篇关于如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/988013

相关文章

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1

如何在 Spring Boot 中实现 FreeMarker 模板

《如何在SpringBoot中实现FreeMarker模板》FreeMarker是一种功能强大、轻量级的模板引擎,用于在Java应用中生成动态文本输出(如HTML、XML、邮件内容等),本文... 目录什么是 FreeMarker 模板?在 Spring Boot 中实现 FreeMarker 模板1. 环

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

利用python实现对excel文件进行加密

《利用python实现对excel文件进行加密》由于文件内容的私密性,需要对Excel文件进行加密,保护文件以免给第三方看到,本文将以Python语言为例,和大家讲讲如何对Excel文件进行加密,感兴... 目录前言方法一:使用pywin32库(仅限Windows)方法二:使用msoffcrypto-too

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删