数据湖解决方案关键一环,IceBerg会不会脱颖而出?

2024-09-06 19:32

本文主要是介绍数据湖解决方案关键一环,IceBerg会不会脱颖而出?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

小编在之前的详细讲解过关于数据湖的发展历程和现状,《我看好数据湖的未来,但不看好数据湖的现在》 ,在最后一部分中提到了当前数据湖的解决方案中,目前跳的最凶的三巨头包括:Delta、Apache Iceberg 和 Apache Hudi。

本文中将详细的介绍一下其中的IceBerg,看一下IceBerg会不会最终脱颖而出。

发展历程

首先,大家要明白为什么出现了类似Iceberg这样的数据技术。

大数据领域发展至今已经经历了相当长时间的发展和探索,虽然大数据技术的出现和迭代降低了用户处理海量数据的门槛,但是有一个问题不能忽视,数据格式对不同引擎适配的对接。

这句话是什么意思呢?

我们在使用不同的引擎进行计算时,需要将数据根据引擎进行适配。这是相当棘手的问题,为此出现了一种新的解决方案:介于上层计算引擎和底层存储格式之间的一个中间层。这个中间层不是数据存储的方式,只是定义了数据的元数据组织方式,并且向引擎层面提供统一的类似传统数据库中"表"的语义。它的底层仍然是Parquet、ORC等存储格式。

基于此,Netflix开发了Iceberg,目前已经是Apache的顶级项目。

IceBerg的特性

我么直接引用官网的介绍:

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Trino and Spark that use a high-performance format that works just like a SQL table.
Iceberg是一个为大规模数据集设计的通用的表格形式。并且适配Trino(原PrestoSQL)和Spark适,提供SQL化解决方案。

IceBerg有一系列特性如下:

  • 模式演化,支持添加,删除,更新或重命名,并且没有副作用

  • 隐藏分区,可以防止导致错误提示或非常慢查询的用户错误

  • 分区布局演变,可以随着数据量或查询模式的变化而更新表的布局

  • 快照控制,可实现使用完全相同的表快照的可重复查询,或者使用户轻松检查更改

  • 版本回滚,使用户可以通过将表重置为良好状态来快速纠正问题

  • 快速扫描数据,无需使用分布式SQL引擎即可读取表或查找文件

  • 数据修剪优化,使用表元数据使用分区和列级统计信息修剪数据文件

  • 兼容性好 ,可以存储在任意的云存储系统和HDFS中

  • 支持事务,序列化隔离 表更改是原子性的,读者永远不会看到部分更改或未提交的更改

  • 高并发,高并发写入器使用乐观并发,即使写入冲突,也会重试以确保兼容更新成功

其中的几个特性精准的命中了用户的痛点,包括:

  • ACID和多版本支持

  • 支持批/流读写

  • 多种分析引擎的支持

其中更为重要的一点,IceBerg积极拥抱以Flink为核心的实时计算体系,提供了非常友好的与Flink结合的能力。

IceBerg初体验

目前IceBerg在Github上的分支已经更新到了0.11.0版本,小编本地搭建了单机版本的Spark和Flink环境,我们先来看Spark+IceBerg的入门案例:

我们可以用简单的像下面这样创建表:

import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val data = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
val schema = SparkSchemaUtil.convert(data.schema)
val name = TableIdentifier.of("default", "test_table")
val table = catalog.createTable(name, schema)

读写操作:

// write the dataset to the table
data.write.format("iceberg").mode("append").save("default.test_table")// read the table
spark.read.format("iceberg").load("default.test_table")

当然也可以通过Sql来读写:

spark.read.format("iceberg").load("default.test_table").createOrReplaceTempView("test_table")
spark.sql("""SELECT count(1) FROM test_table""")

另外,特别值得一提的是,IceBerg社区上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以试用的Flink Iceberg sink原型代码。下载该patch放入master分支,编译并构建即可。我们来试用一下:

// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =new org.apache.hadoop.conf.Configuration();
hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,META_STORE_URIS);
hadoopConf.set(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_WAREHOUSE);Catalog icebergCatalog = new HiveCatalog(hadoopConf);// Create Iceberg table
Schema schema = new Schema(...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);// Obtain an execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Enable checkpointing
env.enableCheckpointing(...);// Add Source
DataStream<Map<String, Object>> dataStream =env.addSource(source, typeInformation);// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =new IcebergSinkAppender<Map<String, Object>>(conf, "test").withSerializer(MapAvroSerializer.getInstance()).withWriterParallelism(1);
appender.append(dataStream);// Trigger the execution
env.execute("Sink Test");

你还可以在这里看到这个组件的设计:

https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing

目前IceBerg对Flink的特性支持如下:

IceBerg还提供非常详细的接入文档:https://github.com/apache/iceberg/blob/master/site/docs/flink.md

在大厂的典型应用

目前一些公开的文章和资料中,我们可以找到一些Flink+IceBerg构建数据湖原型的案例。我们对其中的典型案例进行详细拆解。

阿里在Flink + Iceberg数据湖的探索

上图和下面的方案是阿里巴巴在业务实践中进行的探索之一,采用Iceberg全量数据和Kafka的增量数据来驱动新的Flink作业。如果需要过去很长时间例如一年的数据,可以采用常见的 lambda 架构,离线链路通过 kafka->flink->iceberg 同步写入到数据湖,由于 Kafka 成本较高,保留最近 7 天数据即可,Iceberg 存储成本较低,可以存储全量的历史数据,启动新 Flink 作业的时候,只需要去拉 Iceberg 的数据,跑完之后平滑地对接到 kafka 数据即可。

同样是在 lambda 架构下,实时链路由于事件丢失或者到达顺序的问题,可能导致流计算端结果不一定完全准确,这时候一般都需要全量的历史数据来订正实时计算的结果。而我们的 Iceberg 可以很好地充当这个角色,因为它可以高性价比地管理好历史数据。

腾讯数据平台部Flink + Iceberg 全场景实时数仓

在腾讯数据平台部高级工程师苏舒的分享中,基于 Iceberg snapshot 的 Streaming reader 功能,在传统的Kappa架构基础上,将 Kafka 替换成 Iceberg。

在中间处理层,用 presto 进行一些简单的查询,因为 Iceberg 支持 Streaming read,所以在系统的中间层也可以直接接入 Flink,直接在中间层用 Flink 做一些批处理或者流式计算的任务,把中间结果做进一步计算后输出到下游。

这样把离线任务天级别到小时级别的延迟大大的降低,改造成了一个近实时的数据湖分析系统。

未来期待

目前Apache Iceberg坚定不移在向一个通用的 Table Format方向前进,与下游的引擎和存储解耦,未来是有非常可能成为 Table Format 层的事实标准。

另外正如阿里巴巴的胡争前辈所述:

Apache Iceberg 正在朝着流批一体的数据湖存储层发展,manifest 和snapshot 的设计,有效地隔离不同 transaction 的变更,非常方便批处理和增量计算。而我们知道 Apache Flink 已经是一个流批一体的计算引擎,可以说这二者的长远规划完美匹配,未来二者将合力打造流批一体的数据湖架构。

我看好数据湖的未来,但不看好数据湖的现在

数据湖VS数据仓库?湖仓一体了解一下

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧

这篇关于数据湖解决方案关键一环,IceBerg会不会脱颖而出?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142912

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

Java死锁问题解决方案及示例详解

《Java死锁问题解决方案及示例详解》死锁是指两个或多个线程因争夺资源而相互等待,导致所有线程都无法继续执行的一种状态,本文给大家详细介绍了Java死锁问题解决方案详解及实践样例,需要的朋友可以参考下... 目录1、简述死锁的四个必要条件:2、死锁示例代码3、如何检测死锁?3.1 使用 jstack3.2

html 滚动条滚动过快会留下边框线的解决方案

《html滚动条滚动过快会留下边框线的解决方案》:本文主要介绍了html滚动条滚动过快会留下边框线的解决方案,解决方法很简单,详细内容请阅读本文,希望能对你有所帮助... 滚动条滚动过快时,会留下边框线但其实大部分时候是这样的,没有多出边框线的滚动条滚动过快时留下边框线的问题通常与滚动条样式和滚动行

Oracle修改端口号之后无法启动的解决方案

《Oracle修改端口号之后无法启动的解决方案》Oracle数据库更改端口后出现监听器无法启动的问题确实较为常见,但并非必然发生,这一问题通常源于​​配置错误或环境冲突​​,而非端口修改本身,以下是系... 目录一、问题根源分析​​​二、保姆级解决方案​​​​步骤1:修正监听器配置文件 (listener.

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

安装centos8设置基础软件仓库时出错的解决方案

《安装centos8设置基础软件仓库时出错的解决方案》:本文主要介绍安装centos8设置基础软件仓库时出错的解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录安装Centos8设置基础软件仓库时出错版本 8版本 8.2.200android4版本 javas