Spark 3.0 AQE 专治各种数据倾斜

2023-12-14 21:40
文章标签 数据 spark 倾斜 3.0 专治 aqe

本文主要是介绍Spark 3.0 AQE 专治各种数据倾斜,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、前言

近些年来,在对Spark SQL优化上,CBO是最成功的一个特性之一。

CBO会计算一些和业务数据相关的统计数据,来优化查询,例如行数、去重后的行数、空值、最大最小值等。
Spark根据这些数据,自动选择BHJ或者SMJ,对于多Join场景下的Cost-based Join Reorder,来达到优化执行计划的目的。
但是,由于这些统计数据是需要预先处理的,会过时,所以我们在用过时的数据进行判断,在某些情况下反而会变成负面效果,拉低了SQL执行效率。
AQE在执行过程中统计数据,并动态地调节执行计划,从而解决了这个问题。

2、框架

对于AQE而言,最重要的问题就是什么时候去重新计算优化执行计划。Spark任务的算子如果管道排列,依次并行执行。然而,shuffle或者broadcast exchange会打断算子的排列执行,我们称其为物化点(Materialization Points),并且用"Query Stages"来代表那些被物化点所分割的小片段。每个Query Stage会产出中间结果,当且仅当该stage及其并行的所有stage都执行完成后,下游的Query Stage才能被执行。所以当上游部分stage执行完成,partitions的统计数据也获取到了,并且下游还未开始执行,这就给AQE提供了reoptimization的机会。

在查询开始时,生成完了执行计划,AQE框架首先会找到并执行那些不存在上游的stages。一旦这些stage有一个或多个完成,AQE框架就会将其在physical plan中标记为完成,并根据已完成的stages提供的执行数据来更新整个logical plan。基于这些新产出的统计数据,AQE框架会执行optimizer,根据一系列的优化规则来进行优化;AQE框架还会执行生成普通physical plan的optimizer以及自适应执行专属的优化规则,例如分区合并、数据倾斜处理等。于是,我们就获得了最新优化过的执行计划和一些已经执行完成的stages,至此为一次循环。接着我们只需要继续重复上面的步骤,直到整个query都跑完。

在Spark 3.0中,AQE框架拥有三大特征:

  • 动态折叠shuffle过程中的partition

  • 动态选择join策略

  • 动态优化存在数据倾斜的join

接下来我们就具体来看看这三大特征。

① 动态合并shuffle partitions

在我们处理的数据量级非常大时,shuffle通常来说是最影响性能的。因为shuffle是一个非常耗时的算子,它需要通过网络移动数据,分发给下游算子。

在shuffle中,partition的数量十分关键。partition的最佳数量取决于数据,而数据大小在不同的query不同stage都会有很大的差异,所以很难去确定一个具体的数目:

  • 如果partition过少,每个partition数据量就会过多,可能就会导致大量数据要落到磁盘上,从而拖慢了查询。

  • 如果partition过多,每个partition数据量就会很少,就会产生很多额外的网络开销,并且影响Spark task scheduler,从而拖慢查询。

为了解决该问题,我们在最开始设置相对较大的shuffle partition个数,通过执行过程中shuffle文件的数据来合并相邻的小partitions。
例如,假设我们执行SELECT max(i) FROM tbl GROUP BY j,表tbl只有2个partition并且数据量非常小。我们将初始shuffle partition设为5,因此在分组后会出现5个partitions。若不进行AQE优化,会产生5个tasks来做聚合结果,事实上有3个partitions数据量是非常小的。
然而在这种情况下,AQE只会生成3个reduce task。

② 动态切换join策略

在Spark所支持的众多join中,broadcast hash join性能是最好的。因此,如果需要广播的表的预估大小小于了广播限制阈值,那么我们就应该将其设为BHJ。但是,对于表的大小估计不当会导致决策错误,比如join表有很多的filter(容易把表估大)或者join表有很多其他算子(容易把表估小),而不仅仅是全量扫描一张表。

由于AQE拥有精确的上游统计数据,因此可以解决该问题。比如下面这个例子,右表的实际大小为15M,而在该场景下,经过filter过滤后,实际参与join的数据大小为8M,小于了默认broadcast阈值10M,应该被广播。


在我们执行过程中转化为BHJ的同时,我们甚至可以将传统shuffle优化为本地shuffle(例如shuffle读在mapper而不是基于reducer)来减小网络开销。

③ 动态优化数据倾斜

数据倾斜是由于集群上数据在分区之间分布不均匀所导致的,它会拉慢join场景下整个查询。AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。

我们可以看下这个场景,Table A join Table B,其中Table A的partition A0数据远大于其他分区。

AQE会将partition A0切分成2个子分区,并且让他们独自和Table B的partition B0进行join。


如果不做这个优化,SMJ将会产生4个tasks并且其中一个执行时间远大于其他。经优化,这个join将会有5个tasks,但每个task执行耗时差不多相同,因此个整个查询带来了更好的性能。

3、使用

我们可以设置参数spark.sql.adaptive.enabled为true来开启AQE,在Spark 3.0中默认是false,并满足以下条件:

  • 非流式查询

  • 包含至少一个exchange(如join、聚合、窗口算子)或者一个子查询

AQE通过减少了对静态统计数据的依赖,成功解决了Spark CBO的一个难以处理的trade off(生成统计数据的开销和查询耗时)以及数据精度问题。相比之前具有局限性的CBO,现在就显得非常灵活 - 我们再也不需要提前去分析数据了!

这篇关于Spark 3.0 AQE 专治各种数据倾斜的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493976

相关文章

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别

python库pydantic数据验证和设置管理库的用途

《python库pydantic数据验证和设置管理库的用途》pydantic是一个用于数据验证和设置管理的Python库,它主要利用Python类型注解来定义数据模型的结构和验证规则,本文给大家介绍p... 目录主要特点和用途:Field数值验证参数总结pydantic 是一个让你能够 confidentl

JAVA实现亿级千万级数据顺序导出的示例代码

《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。实现思路:A.启用线程池

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本