如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步

2024-05-14 06:36

本文主要是介绍如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

AWS Managed Apache Flink (以下以 MAF 代指)是 AWS 提供的一款 Serverless 的 Flink 服务。

1. 问题

大家在使用 MAF 的时候,可能遇到最大的一个问题就是 MAF 的依赖管理,很多时候在 Flink 上运行的代码,托管到 MAF 上之后发现有很多依赖问题需要解决,大体上感觉就是 MAF 一定需要一个纯洁的环境,纯洁的 Flink 代码包。
而我们在使用 MAF 向 Iceberg 表写入数据时候更是如此。在使用 MAF 向 Iceberg 写入数据时,使用 Glue Data Catalog,会遇到如下报错:

Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:211)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:406)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1356)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1111)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:701)

分析上面的错误,发现是在执行 Craete catalog 的时候,调用了 clusterHadoopConf 方法。我们在继续分析源码,在Iceberg 的源码 FlinkCatalogFactory 中,找到报错的代码位置,如下:

public static Configuration clusterHadoopConf() {return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}

而 HadoopUtils 这个类是来自于 org.apache.flink.runtime.util.HadoopUtils,我怀疑可能是 MAF 的环境是依赖于 EKS,因此镜像中并没有包含和 hadoop 相关的依赖,导致这里方法加载默认配置的时候,找不到 org/apache/hadoop/conf/Configuration 类,但是当我尝试在 maven 中加入 hadoop-client 依赖后,仍然存在这个问题。

2. 解决方案

通过上面的分析,我们知道了问题是出在了 org.apache.flink.runtime.util.HadoopUtils这个类,查找了很多资料,终于在 github 的 issue 中发现也有人遇到过这样的问题【#3044】,并且给出了一个绕行的方法,就是在自己的代码工程中重写 org.apache.flink.runtime.util.HadoopUtils这个类,不得不承认这是一个高明的方法。

重写HadoopUtils
在我们的代码工程中创建一个 package,并且添加一个名为 HadoopUtils 的 class,填入如下代码:

package org.apache.flink.runtime.util;import org.apache.hadoop.conf.Configuration;public class HadoopUtils {public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {return new Configuration(false);}
}

然后重新打包代码。
也可以参考 github 上的代码,链接🔗 github code
然后我们就可以编译打包代码。

3. Demo

下面我们通过一个完整的 Demo 来了解如何在 MAF 上实现 Iceberg 表的实时摄入。Demo 中会使用一个数据生成工具 Datafaker ,生成数据并且写入 MSK(kafka)中。

3.1 编译代码

获取 Demo代码,直接编译打包。

3.2 创建 MAF Application

  1. 将打包的 jar 上传至S3
  2. 进入 MAF 控制台,创建 Application,版本选择 Flink 1.18。
  3. 在 Application code location 部份填写在第1步上传的 jar 位置。
  4. MAF 会自动创建一个 IAM Role,在完成 Application 创建之后,请记得给这个 IAM Role 添加 Glue 读和写 Data Catalog 的权限,因为 Demo 代码工程会使用 Glue data catalog 作为 Iceberg catalog。
  5. 创建完 Application 就可以直接点击 Run 运行了。

3.3 生成数据

export MYBROKERS=<kafka-server>
export KAFKA_HOME=/home/ec2-user/environment/kafka_2.12-2.8.1
export TOPIC=datafaker_user_order_list_01
export IMPORT_ROWS=100000
#写入一条记录的间隔时间,也可以不设置
export INTERVAL=0.01
datafaker kafka $MYBROKERS $TOPIC $IMPORT_ROWS --meta dataformat_01.txt --interval $INTERVAL

这里就不详细介绍 datafaker 的使用了,如果想了解 datafaker 的参数配置可以从这个 github datafaker 获取。

3.4 在 Athena 中查询数据写入的结果

注意,如果 Athena 开启了 Reuse query results,可能会导致 count(*) 查询的不是最新的结果。
在这里插入图片描述

  1. 运维监控
    4.1 Metrics
    由于写入 Iceberg 表,不会在 Flink UI 看到 Records Recevied 以及 Records Send 等指标,因此如果想查看 Iceberg Sink 写入的数据量,需要进入Flink UI Sink 算子中,查看 Metrics 的 committedDataFilesRecordCount 指标。
    在这里插入图片描述

这篇关于如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/988013

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S