5.数据湖deltalake流表的读写

2023-10-09 02:08
文章标签 数据 读写 流表 deltalake

本文主要是介绍5.数据湖deltalake流表的读写,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常见的与流系统和文件整合带来的相关限制,如下:

  • 保证了多个流(或并发批处理作业)的仅一次处理。

  • 当使用文件作为流源时,可以有效地发现哪些文件是新文件。

1. 作为stream source

1.1 案例讲解

当你的structured streaming使用delta lake作为stream source的时候,应用会处理delta 表中已有的数据,以及delta 表新增的数据。

spark.readStream.format("delta").load("/delta/events")

也可以做一些优化,如下:

a.通过maxFilesPerTrigger配置控制structured streaming从delta lake加载的微批文件数。要知道Structured streaming也是微批的概念。该参数就是控制每次trigger计算的最大新增文件数,默认是1000,实际情况要根据数据量和资源数量进行控制。

b.通过maxBytesPerTrigger控制每次trigger处理的最大数据量。这是设置一个“ soft max”,这意味着一个批处理大约可以处理此数量的数据,并且可能处理的数量超出这个限制。如果使用的是Trigger.Once,则 此配置无效。如果将此配置与maxFilesPerTrigger结合使用,两个参数任意一个达到临届条件,都会生效。

1.2 忽略更新和删除

structured streaming不处理不是追加的输入数据,并且如果对作为source的delta table的表进行了任何修改,则structured streaming会抛出异常。 对于变更常见的企业场景,提供了两种策略,来处理对delta 表变更给structured streaming 任务造成的影响:

  • 可以删除输出和checkpoint,并重新启动structured streaming对数据计算,也即是重新计算一次。

  • 可以设置以下两个选项之一:

    • ignoreDeletes:忽略在分区表中删除数据的事务。

    • ignoreChanges:如果由于诸如UPDATE,MERGE INTO,DELETE(在分区内)或OVERWRITE之类的数据更改操作而不得不在源表中重写文件,则重新处理更新的文件。因此未更改的行仍可能会处理并向下游传输,因此structured streaming的下游应该能够处理重复数据。删除不会传输到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,则流不会因源表的删除或更新而中断。

1.3 案例

假设有一张表叫做user_events,有三个字段:date,user_email,action,而且该表以date字段进行分区。structured streaming区处理这张表,且还有其程序会对该delta 表进行插入和删除操作。

假设仅仅是删除操作,可以这么配置stream:

events.readStream  .format("delta")  .option("ignoreDeletes", "true")  .load("/delta/user_events")

假设对delta表修改操作,可以这么配置stream:

events.readStream  .format("delta")  .option("ignoreChanges", "true")  .load("/delta/user_events")

如果使用UPDATE语句更新了user_email字段某个值,则包含相关user_email的文件将被重写,这个是delta lake更改操作实现机制后面会讲。使用ignoreChanges时,新记录将与同一文件中的所有其他未更改记录一起向下游传输。 所以下游程序应该能够处理这些传入的重复记录。

2.delta 表作为sink

delta table可以作为Structured Streaming的sink使用。delta lake的事务日志确保了其能实现仅一次处理。

2.1 append mode

默认是append 模式,仅仅是追加数据到delta 表:

events.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").start("/delta/events") // as a path

2.2 complete mode

也可以使用Structured Streaming每个批次覆盖一次整张表。在某些聚合场景下会用到该模式:

  .format("delta").load("/delta/events").groupBy("customerId").count().writeStream.format("delta").outputMode("complete").option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg").start("/delta/eventsByCustomer")

对于延迟要求更宽松的应用程序,可以使用Trigger.Once来节省计算资源。once trigger每次处理从开始到最新的数据,典型的kappa模型,很适合这种场景了。

推荐阅读:

1.数据湖deltalake初识

2.数据湖DeltaLake之DDL操作

3.数据湖deltalake之时间旅行及版本管理

4.数据湖之schema校验

这篇关于5.数据湖deltalake流表的读写的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169580

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock

使用C#删除Excel表格中的重复行数据的代码详解

《使用C#删除Excel表格中的重复行数据的代码详解》重复行是指在Excel表格中完全相同的多行数据,删除这些重复行至关重要,因为它们不仅会干扰数据分析,还可能导致错误的决策和结论,所以本文给大家介绍... 目录简介使用工具C# 删除Excel工作表中的重复行语法工作原理实现代码C# 删除指定Excel单元