3.数据湖deltalake之时间旅行及版本管理

2023-10-09 02:08

本文主要是介绍3.数据湖deltalake之时间旅行及版本管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

浪尖在deltalake第一讲的时候说过,它支持数据版本管理和时间旅行:提供了数据快照,使开发人员能够访问和还原早期版本的数据以进行审核、回滚或重新计算。

1.场景

delta lake的时间旅行,实际上就是利用多版本管理机制,查询历史的delta 表快照。时间旅行有以下使用案例:

1).可以重复创建数据分析,报告或者一些输出(比如,机器学习模型)。这主要是有利于调试和安全审查,尤其是在受管制的行业里。

2).编写复杂的基于时间的查询。

3).修正数据中的错误信息。

4).为一组查询提供快照隔离,以快速变更表。

2.配置

DataframeTable支持创建dataframe的时候指定一个delta lake表的版本信息:

val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

对于版本号,直接传入一个版本数值即可,如下:

val df2 = spark.read.format("delta").option("versionAsOf", 0).table(tableName)

对于timestamp字符串,必须要是date格式或者timestamp格式。例如:

val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28").load("/delta/events")val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28T00:00:00.000Z").load("/delta/events")

由于delta lake的表是存在更新的情况,所以多次读取数据生成的dataframe之间会有差异,因为两次读取数据可能是一次是数据更新前,另一次是数据更新后。使用时间旅行你就可以在多次调用之间修复数据。

val latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()val df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")

3.数据保存时间

默认情况下,deltalake保存最近30天的提交历史。这就意味着可以指定30天之前的版本来读取数据,但是有些注意事项:

3.1 没对delta 表调用VACUUM函数。VACUUM函数是用来删除不在引用的delta表和一些超过保留时间的表,支持sql和API形式。

slq表达式:

VACUUM eventsTable   -- vacuum files not required by versions older than the default retention periodVACUUM '/data/events' -- vacuum files in path-based tableVACUUM delta.`/data/events/`VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours oldVACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

scala API 表达式

import io.delta.tables._val deltaTable = DeltaTable.forPath(spark, pathToTable)deltaTable.vacuum()        // vacuum files not required by versions older than the default retention perioddeltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

可以通过下面两个delta 表属性配置来

  • delta.logRetentionDuration =“ interval <interval>”:控制将表的历史记录保留多长时间。每次写入checkpoint时,都会自动清除早于保留间隔的日志。如果将此配置设置为足够大的值,则会保留许多日志。这不会影响性能,因为针对日志的操作是常量时间。历史记录的操作是并行的(但是随着日志大小的增加,它将变得更加耗时)。默认值为 interval 30 days。

  • delta.deletedFileRetentionDuration =“ interval <interval>”:在这个时间范围内的数据是不会被VACUUM命令删除。默认值为间隔7天。要访问30天的历史数据,请设置delta.deletedFileRetentionDuration = "interval 30 days"。此设置可能会导致您的存储成本上升。

注意:VACUUM命令是不会删除日志文件的,日志文件是在checkpoint之后自动删除的。

为了读取之前版本的数据,必须要保留该版本的日志文件和数据文件。

4.案例

修复意外删除的用户111的数据。

INSERT INTO my_table  SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)  WHERE userId = 111

修复错误更新的数据

MERGE INTO my_table target  USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source  ON source.userId = target.userId  WHEN MATCHED THEN UPDATE SET *

查询过去七天新增的消费者数:

  SELECT count(distinct userId)  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

推荐阅读:

1.数据湖deltalake初识

2.数据湖DeltaLake之DDL操作

这篇关于3.数据湖deltalake之时间旅行及版本管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169577

相关文章

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用jenv工具管理多个JDK版本的方法步骤

《使用jenv工具管理多个JDK版本的方法步骤》jenv是一个开源的Java环境管理工具,旨在帮助开发者在同一台机器上轻松管理和切换多个Java版本,:本文主要介绍使用jenv工具管理多个JD... 目录一、jenv到底是干啥的?二、jenv的核心功能(一)管理多个Java版本(二)支持插件扩展(三)环境隔

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据