揭秘Iceberg:数据湖新版本的高级特性全面解析

2024-08-21 04:12

本文主要是介绍揭秘Iceberg:数据湖新版本的高级特性全面解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1前言

2 特性说明

2.1 Branch and Tag

2.2 Puffin format

2.3 Statistics 

2.4View

创建物化视图

查询物化视图

3应用案例

3.1 CDC数据入湖

3.2 多流数据拼接

3.3 异步索引和Z-order聚簇优化

3.4 A/B测试

3.5 多租户访问控制


1 前言

以下的讨论都基于iceberg1.1.0版本

2 特性说明

2.1 Branch and Tag

支持对表创建分支和标签,方便数据管理和访问。可以在特定快照上创建分支或标签,并设置保留时间。读写可以指定分支。

2.2 Puffin format

Puffin( ['pʌfɪn])  format是Iceberg引入的一种新的文件格式,主要用于存储那些无法直接存储在Iceberg清单(manifest)文件中的数据,如表的索引和统计信息。 

Puffin format的设计目标是:

  1. 独立性:Puffin文件可以独立于数据文件和清单文件而存在,方便单独管理和访问。

  2. 灵活性:Puffin文件的内容和结构可以根据不同的需求而定制,如不同类型的索引(Bloom filter、B+树等)或不同粒度的统计信息(表级、分区级等)。

  3. 可扩展性:Puffin format支持动态添加新的内容类型,以支持未来的新特性和需求。

 创建表时启用Puffin format

Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);PartitionSpec spec = PartitionSpec.unpartitioned();Map<String, String> properties = new HashMap<>();
properties.put(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());
properties.put(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "134217728");// 启用Puffin format
properties.put(TableProperties.WRITE_NEW_FORMAT_ENABLED, "true");Table table = catalog.createTable(schema, spec, properties, "my_table");
Puffin Statics社区说明

假设我们有一个很大的Iceberg表,记录了某电商平台的用户订单信息,包括用户ID、商品ID、购买时间、金额等字段。现在我们要为这个表建立索引和统计信息,以优化查询性能。

  • 索引:我们决定为用户ID和商品ID分别建立Bloom filter索引,以加速查询。我们可以在数据写入时动态生成这些索引,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: user_id_index.puffin
Magic Number: PUFFIN
Version: 1
Index Type: BLOOM_FILTER
Column: user_id
Chunk 1: (offset: 0, length: 1024)
Chunk 2: (offset: 1024, length: 1024)
...
Bloom Filter Data: [...bytes...]

  • 统计信息:我们还想为表的每个分区(如按天分区)记录一些统计信息,如行数、总金额等。这些统计信息可以帮助查询优化器生成更好的执行计划。我们可以在数据写入后异步生成这些统计信息,并将其保存在Puffin文件中。Puffin文件可能包含以下内容:

Puffin File: partition_stats.puffin
Magic Number: PUFFIN
Version: 1
Stats Type: PARTITION_STATS
Partition 1: (name: "2022-01-01", rows: 1000000, total_amount: 1234567.89)
Partition 2: (name: "2022-01-02", rows: 1500000, total_amount: 2345678.90)
...

有了这些索引和统计信息,我们就可以更高效地查询数据。例如,如果我们要查询某个用户在某个时间段内的订单,可以先通过Bloom filter索引快速过滤出可能的数据文件,然后再根据分区统计信息选择最优的查询路径

2.3 Statistics 

Statistics如果帮助提高CBO(Cost-Based Optimizer)

  1. 基数估计:

    • 诸如行数、不同值计数和空值计数等统计信息有助于估计表和中间结果的基数。
    • 基数估计对于预测查询结果的大小以及优化连接顺序、连接算法和数据访问路径至关重要。
    • 准确的基数估计可以得出更精确的成本计算结果和更好的执行计划。
  2. 选择性估计:

    • 列值的统计信息(如最小值、最大值和直方图)可以估计谓词的选择性。
    • 选择性是指满足给定谓词或条件的行的比例。
    • CBO使用选择性估计来确定过滤器、索引和连接条件的有效性。
    • 更准确的选择性估计有助于CBO选择最有效的过滤器和访问路径。
  3. 数据倾斜检测

  4. 索引选择

  5. 分区裁剪

  6. 连接算法选择

  7. 资源估计

写入数据时生成索引和统计信息

DataFile dataFile = DataFiles.builder(spec).withPath("/path/to/data-1.parquet").withFileSizeInBytes(1234).withRecordCount(100).build();// 生成Bloom filter索引
BloomFilter bloomFilter = new BloomFilter(1000, 0.01);
bloomFilter.add(1);
bloomFilter.add(2);
// ...// 生成统计信息
Map<Integer, Long> columnSizes = new HashMap<>();
columnSizes.put(1, 100L);
columnSizes.put(2, 200L);
// ...StatisticsFile statisticsFile = StatisticsFiles.builder().withPath("/path/to/stats-1.puffin").withBloomFilter(1, bloomFilter).withColumnSizes(columnSizes).build();table.newAppend().appendFile(dataFile).appendStatistics(statisticsFile).commit();

查询数据时使用索引和统计信息 

Table table = catalog.loadTable("my_table");// 读取索引和统计信息
StatisticsFile statisticsFile = table.currentSnapshot().statisticsFile();
BloomFilter bloomFilter = statisticsFile.bloomFilter(1);
Map<Integer, Long> columnSizes = statisticsFile.columnSizes();// 使用索引和统计信息进行查询优化
CloseableIterable<Record> reader = IcebergGenerics.read(table).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).filter(Expressions.equal("id", 1)).caseSensitive(true).useBloomFilter(1, bloomFilter).build()).build();

 在查询数据时,可以通过table.currentSnapshot().statisticsFile()读取当前快照对应的索引和统计信息文件,然后在创建CloseableIterable读取器时,通过useBloomFilter等方法应用这些索引和统计信息,以优化查询性能。

以上就是在Java中使用Iceberg的Puffin format和Statistics的基本流程。当然,具体的实现还涉及到更多的细节和优化,如索引和统计信息的增量更新、并发控制等,需要根据实际的应用场景来设计。

2.4View

view可以很好地支持物化视图场景,通过预先计算和存储数据来加速查询。

创建物化视图

// 定义基表
Schema schema = new Schema(required(1, "id", Types.LongType.get()),required(2, "data", Types.StringType.get())
);
PartitionSpec spec = PartitionSpec.unpartitioned();
Table baseTable = catalog.createTable(schema, spec, "base_table");// 定义物化视图
String viewQuery = "SELECT id, count(*) as cnt FROM base_table GROUP BY id";
Map<String, String> properties = new HashMap<>();
properties.put("format-version", "2");
properties.put("location", "/path/to/view");//将刷新模式设置为periodic(定期刷新),刷新间隔设置为1h(1小时)。Iceberg会在后台自动管理物化视图的刷新,无需手动调用refresh方法。
properties.put("iceberg.view.refresh.mode", "periodic");
properties.put("iceberg.view.refresh.period", "1h");
Table materializedView = catalog.createView(viewQuery, Collections.emptyList(), properties, "my_view");

这个例子中,我们首先创建了一个基表base_table,然后定义了一个聚合查询语句作为物化视图my_view的定义。注意,在创建视图时,我们需要设置一些属性,如format-versionlocation,以指定视图的存储格式和位置。

查询物化视图

// 查询物化视图
try (CloseableIterable<Record> reader = IcebergGenerics.read(materializedView).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).build()) {for (Record record : reader) {System.out.println(record);}
}

查询物化视图与查询普通表的方式类似,Iceberg会自动从视图的存储位置读取预先计算好的数据,从而提供更快的查询速度。

3应用案例

3.1 CDC数据入湖

// 创建主分支和变更分支
Branch mainBranch = table.mainBranch();
Branch changeBranch = mainBranch.createBranch("change_branch_" + System.currentTimeMillis());// 将CDC事件写入变更分支
List<Record> cdcEvents = Lists.newArrayList(new Record(1L, "update", "a", "a1"),new Record(2L, "insert", "b", null),new Record(3L, "delete", "c", null)
);try (CloseableIterable<Record> writer = IcebergGenerics.write(changeBranch).createWriterFunc(GenericParquetWriter::buildWriter).build()) {writer.forEach(cdcEvents::add);
}// 生成变更日志
Table changeTable = changeBranch.table();
Table changeLogTable = changeTable.newAppend().appendFile(changeTable.currentSnapshot().addedFiles()).commit();// 将变更分支快照合并到主分支
mainBranch.cherryPick(changeBranch.snapshot()).commit();

在这个例子中,我们首先创建了一个变更分支change_branch,用于存储CDC事件。然后,我们将CDC事件写入变更分支,并基于变更分支生成了一个变更日志表changeLogTable。最后,我们使用cherryPick方法将变更分支的快照合并到主分支,从而完成了CDC数据入湖的过程。 

3.2 多流数据拼接

-- 定义基表
CREATE TABLE my_table (key1 INTEGER,key2 INTEGER,b0_new INTEGER,c0_new INTEGER,d0_new INTEGER,e0_new INTEGER,c1_new INTEGER,d1_new INTEGER,c2_new INTEGER,d2_new INTEGER
);-- 创建主分支和两个增量分支
CREATE BRANCH my_table_main;
CREATE BRANCH my_table_inc1;
CREATE BRANCH my_table_inc2;-- 给增量分支1写入部分数据
MERGE INTO my_table my_table_inc1USING (SELECT 'A' AS key1, 'B' AS key2, 'b0_new' AS b0_new, 'c0_new' AS c0_new) AS inputON my_table_inc1.key1 = input.key1 AND my_table_inc1.key2 = input.key2  WHEN MATCHED THENUPDATE SET b0_new = input.b0_new, c0_new = input.c0_newWHEN NOT MATCHED THEN  INSERT (key1, key2, b0_new, c0_new)VALUES (input.key1, input.key2, input.b0_new, input.c0_new);-- 给增量分支2写入部分数据  
MERGE INTO my_table my_table_inc2USING (SELECT 'A' AS key1, 'D' AS key2, 'e0_new' AS e0_new) AS inputON my_table_inc2.key1 = input.key1 AND my_table_inc2.key2 = input.key2WHEN MATCHED THENUPDATE SET e0_new = input.e0_new  WHEN NOT MATCHED THENINSERT (key1, key2, e0_new)VALUES (input.key1, input.key2, input.e0_new);-- 把增量数据合并到主分支,分区键和聚簇键保持不变
MERGE INTO my_table my_table_mainUSING my_table_inc1 ON my_table_main.key1 = my_table_inc1.key1 AND my_table_main.key2 = my_table_inc1.key2WHEN MATCHED AND my_table_inc1.b0_new IS NOT NULL THEN  UPDATE SET b0_new = my_table_inc1.b0_new, c0_new = my_table_inc1.c0_newWHEN NOT MATCHED THENINSERT *;MERGE INTO my_table my_table_main  USING my_table_inc2ON my_table_main.key1 = my_table_inc2.key1 AND my_table_main.key2 = my_table_inc2.key2WHEN MATCHED AND my_table_inc2.e0_new IS NOT NULL THENUPDATE SET e0_new = my_table_inc2.e0_new  WHEN NOT MATCHED THENINSERT *;
  1. 首先定义了一个基表my_table,包含了所有需要的列。

  2. 然后创建了一个主分支和两个增量分支。

  3. 分别向两个增量分支写入部分变更数据。

  4. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

  5. 首先定义了一个基表my_table,包含了所有需要的列。

  6. 然后创建了一个主分支和两个增量分支。

  7. 分别向两个增量分支写入部分变更数据。

  8. 最后使用MERGE INTO语句将两个增量分支的数据合并到主分支,通过ON子句匹配主键,通过WHEN MATCHED THEN UPDATE更新存在的行,通过WHEN NOT MATCHED THEN INSERT插入新行。

3.3 异步索引和Z-order聚簇优化

使用异步索引和Z-order聚簇优化,在分支上验证效果,支持A/B测试

// 创建优化分支
Branch optimizedBranch = mainBranch.createBranch("optimized_branch_" + System.currentTimeMillis());// 异步构建Bloom filter索引
BloomFilterIndex bloomFilterIndex = BloomFilterIndex.builderFor(optimizedBranch).withColumnNames("id").withExpectedEntries(1000000).withFalsePositiveProbability(0.01).build();// 异步构建Z-order索引
ZOrderIndex zOrderIndex = ZOrderIndex.builderFor(optimizedBranch).withColumnNames("ts", "id").build();IndexTasks.Builder indexBuilder = IndexTasks.builderFor(optimizedBranch).addIndex(bloomFilterIndex).addIndex(zOrderIndex).withMaxDurationInMs(30 * 60 * 1000); // 30 minutes timeoutIndexTasks indexTasks = indexBuilder.build();
indexTasks.executeAsync();// 使用Z-order聚簇优化数据布局
ClusteringOrder clusteringOrder = ClusteringOrder.builderFor(optimizedBranch).asc("ts").asc("id").build();optimizedBranch.replaceSortOrder(clusteringOrder).commit();

在这个例子中,我们首先创建了一个优化分支optimized_branch,用于尝试不同的优化策略。然后,我们异步构建了Bloom filter索引和Z-order索引,以加速查询。同时,我们还使用了Z-order聚簇来优化数据布局,使得相关数据在存储上更加紧凑。

3.4 A/B测试

// 在优化分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(optimizedBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).planWithBloomFilter(true).planWithZOrder(true).build()) {// ...
}// 在主分支上执行查询
try (CloseableIterable<Record> reader = IcebergGenerics.read(mainBranch).where(Expressions.equal("id", 1)).createReaderFunc(fileSchema -> ORC.read(fileSchema).createReaderFunc(readOrcColumns(schema())).caseSensitive(true).build()).planWithFilter(true).planWithColumnStats(true).build()) {// ...
}

为了验证优化策略的效果,我们可以在优化分支和主分支上分别执行相同的查询,并比较它们的性能指标(如查询延迟、资源消耗等)。这就是一种简单的A/B测试方法。如果优化分支的性能明显优于主分支,我们就可以将优化分支的变更合并到主分支。

3.5 多租户访问控制

// 创建部门视图
SQL.execute("CREATE VIEW dept_view_1 AS SELECT id, data FROM my_table WHERE dept = 'dept1'");
SQL.execute("CREATE VIEW dept_view_2 AS SELECT id, ts FROM my_table WHERE dept = 'dept2'");// 为不同部门授权视图
SQL.execute("GRANT SELECT ON dept_view_1 TO ROLE dept1_role");
SQL.execute("GRANT SELECT ON dept_view_2 TO ROLE dept2_role");

对于多租户表,不同部门可能关注不同的列。为了避免直接访问全表,我们可以为每个部门创建一个授权视图,只暴露部门关注的列。然后,我们可以使用Iceberg的SQL扩展来管理视图的权限,确保每个部门只能访问自己被授权的视图。 

这篇关于揭秘Iceberg:数据湖新版本的高级特性全面解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1092085

相关文章

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

Pandas统计每行数据中的空值的方法示例

《Pandas统计每行数据中的空值的方法示例》处理缺失数据(NaN值)是一个非常常见的问题,本文主要介绍了Pandas统计每行数据中的空值的方法示例,具有一定的参考价值,感兴趣的可以了解一下... 目录什么是空值?为什么要统计空值?准备工作创建示例数据统计每行空值数量进一步分析www.chinasem.cn处

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Python利用ElementTree实现快速解析XML文件

《Python利用ElementTree实现快速解析XML文件》ElementTree是Python标准库的一部分,而且是Python标准库中用于解析和操作XML数据的模块,下面小编就来和大家详细讲讲... 目录一、XML文件解析到底有多重要二、ElementTree快速入门1. 加载XML的两种方式2.

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类