自适应查询执行AQE:在运行时加速SparkSQL

2023-12-14 21:40

本文主要是介绍自适应查询执行AQE:在运行时加速SparkSQL,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

演讲嘉宾简介:王道远,阿里巴巴技术专家

以下内容根据演讲视频以及PPT整理而成。

点击链接观看精彩回放:

https://developer.aliyun.com/live/43188

自适应查询执行AQE简介

关于自适应查询执行,在数据库领域早有充分研究。在Spark社区,最早在Spark 1.6版本就已经提出发展自适应执行(Adaptive Query Execution,下文简称AQE);到了Spark 2.x时代,Intel大数据团队进行了相应的原型开发和实践;到了Spark 3.0时代,Databricks和Intel一起为社区贡献了新的AQE。

什么是AQE呢?简单来说就是根据在运行时统计信息(runtime statistics)在查询执行的过程中进行动态(Dynamic)的查询优化。那么我们为什么需要AQE呢?在Spark 2.x时代,为了选择最佳执行计划,我们引入了CBO(Cost-based optimization),但是在一些场景下,效果非常不好,缺点明显,比如:

  • 统计信息过期或者缺失导致估计错误;

  • 收集统计信息代价较大(比如column histograms);;

  • 某些谓词使用自定义UDF导致无法预估;

  • 手动指定执行hint跟不上数据变化。

而在Spark 3.0时代,AQE完全基于精确的运行时统计信息进行优化,引入了一个基本的概念Query Stages,并且以Query Stage为粒度,进行运行时的优化,其工作原理如下所示:

整个AQE的工作原理以及流程为:

  1. 运行没有依赖的stage;

  2. 在一个stage完成时再依据新的统计信息优化剩余部分;

  3. 执行其他已经满足依赖的stage;

  4. 重复步骤(2)(3)直至所有stage执行完成。

    Spark 3.0中主要的AQE特性

Spark 3.0中主要的AQE特性包括:

  • 动态合并shuffle分区;

  • 动态转换join策略;

  • 动态优化join中的数据倾斜。

动态合并shuffle分区

Shuffle分区数量和大小对查询性能很关键。在Spark 3.0以前,Shuffle分区是一个固定值,存在着明显的缺点,如果分区过小会导致I/O低效、调度开销和任务启动开销,但是如果分区过大又会带来GC压力和溢写硬盘等问题。另一方面,在Spark 3.0之前,整个查询执行过程中使用统一的分区数,而在查询执行的不同阶段,数据规模会发生明显变化,如果保持统一的分区数,则大大降低了效率。基于以上,动态合并Shuffle分区是非常必要的。

AQE解决上面问题的具体做法是设置较大的初始分区数来满足整个查询执行过程中最大的分区数,并且在每个Query stage结束的时候按需自动合并分区,其具体的流程如下图所示:

具体来说,动态合并Shuffle分区的原理如下:

对于普通的Shuffle来说,没有自动合并的过程,每个MAP读取Shuffle后,会根据指定分区数进行分区,比如下图为5:

进行上图所示的分区后发现,REDUCE1和REDUCE5要处理的数据量明显高于其余三个REDUCE,而我们理想的情况下是每个REDUCE处理的数据量是相当的,所以AQE进行了动态合并分区,将相邻的小分区2,3,4进行合并,输出三个REDUCE,大大提高了后续的效率,如下图所示:

动态转换join策略

在Spark中,我们希望当Join的某一边可以完全放入内存时,Spark选择Broadcast Hash Join,但是实际上会出现预估可能不够准确,导致本来可以优化为BHJ的没有被优化的情况,原因也很多,比如;

  • 统计信息不够准确;

  • 子查询太复杂;

  • 黑盒的谓词,比如自定义UDF。

对于以上问题,AQE的解决方法就是使用运行时数据大小重新选择执行计划,其整个流程与原理如下图所示:

动态优化join中的数据倾斜

在Join中的数据倾斜会导致一系列的问题,比如性能下降、某一个task影响整个stage的运行等,处理数据量比较大的partitions时候还可能会出现溢写磁盘的情况。AQE针对上述问题使用运行时的统计信息自动优化查询执行,动态的发现倾斜数据的数量,并且把倾斜的分区分成更小的子分区来处理。其做法如下图所示:

具体来说其原理如下:
对于普通的sort merge join来说,没有倾斜优化,可能会造成某个Shuffle分区的数据数量明显高于其他分区,如下图中的PART.A0,这种情况会造成A0和B0的这个Join执行速度明显慢于其他的Join。

有了AQE之后,根据数据倾斜优化后的sort merge join,使用skew Shuffle reader,如下图所示将A0分成三个子分区,并将对应的B0复制三份,整个Join任务的运行效率大大提升。

上述的几个特性可以在Demo中查看https://docs.databricks.com/_static/notebooks/aqe-demo.html 。

TPC-DS性能测试

进行TPC-DS性能测试的集群配置如下图所示:

测试结果显示,2条Query获得了1.5倍的性能提升,37条Query获得了1.1倍的性能提升。

下面两张图是关于分区合并和Join策略的性能测试结果,可以看出AQE对于性能的提升还是非常明显的。


除了在TPC-DS的测试中AQE表现优秀,在实际生产环境中AQE对于性能的提升也非常优秀,比如某电商公司分享在某些典型的倾斜查询中使用了AQE之后获得了十几倍的性能提升,某互联网巨头使用了AQE之后发现在2个典型的查询中性能分别有了5倍和1.38倍的提升等等。

QA

Q1:Shuffle是如何对大量小文件进行优化的?
A1:AQE 支持的动态分区合并可以减少 shuffle 后的分区数,如果是 ETL 作业写动态分区表,建议手动添加distribute by partkey 等子句来减少输出文件数量。

Q2:AQE是否支持外部的Shuffle Service?
A2:支持,需要 shuffle service 提供基本的统计信息

Q3:如果join的两边的part都比较大,是不是都会拆分?还会broadcast 么?
A3:都比较大的话优化就没啥用了,需要从业务出发进行优化。

猜你喜欢

1、Spark 背后的商业公司收购的 Redash 是个啥?

2、马铁大神的 Apache Spark 十年回顾

3、基于Apache Iceberg打造T+0实时数仓

4、Presto on Spark:扩展 Presto 以支持大规模 ETL

这篇关于自适应查询执行AQE:在运行时加速SparkSQL的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493969

相关文章

MyBatis分页查询实战案例完整流程

《MyBatis分页查询实战案例完整流程》MyBatis是一个强大的Java持久层框架,支持自定义SQL和高级映射,本案例以员工工资信息管理为例,详细讲解如何在IDEA中使用MyBatis结合Page... 目录1. MyBATis框架简介2. 分页查询原理与应用场景2.1 分页查询的基本原理2.1.1 分

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

Java实现远程执行Shell指令

《Java实现远程执行Shell指令》文章介绍使用JSch在SpringBoot项目中实现远程Shell操作,涵盖环境配置、依赖引入及工具类编写,详解分号和双与号执行多指令的区别... 目录软硬件环境说明编写执行Shell指令的工具类总结jsch(Java Secure Channel)是SSH2的一个纯J

Java实现复杂查询优化的7个技巧小结

《Java实现复杂查询优化的7个技巧小结》在Java项目中,复杂查询是开发者面临的“硬骨头”,本文将通过7个实战技巧,结合代码示例和性能对比,手把手教你如何让复杂查询变得优雅,大家可以根据需求进行选择... 目录一、复杂查询的痛点:为何你的代码“又臭又长”1.1冗余变量与中间状态1.2重复查询与性能陷阱1.

python 线程池顺序执行的方法实现

《python线程池顺序执行的方法实现》在Python中,线程池默认是并发执行任务的,但若需要实现任务的顺序执行,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录方案一:强制单线程(伪顺序执行)方案二:按提交顺序获取结果方案三:任务间依赖控制方案四:队列顺序消

MySQL中查询和展示LONGBLOB类型数据的技巧总结

《MySQL中查询和展示LONGBLOB类型数据的技巧总结》在MySQL中LONGBLOB是一种二进制大对象(BLOB)数据类型,用于存储大量的二进制数据,:本文主要介绍MySQL中查询和展示LO... 目录前言1. 查询 LONGBLOB 数据的大小2. 查询并展示 LONGBLOB 数据2.1 转换为十

使用SpringBoot+InfluxDB实现高效数据存储与查询

《使用SpringBoot+InfluxDB实现高效数据存储与查询》InfluxDB是一个开源的时间序列数据库,特别适合处理带有时间戳的监控数据、指标数据等,下面详细介绍如何在SpringBoot项目... 目录1、项目介绍2、 InfluxDB 介绍3、Spring Boot 配置 InfluxDB4、I

Go语言连接MySQL数据库执行基本的增删改查

《Go语言连接MySQL数据库执行基本的增删改查》在后端开发中,MySQL是最常用的关系型数据库之一,本文主要为大家详细介绍了如何使用Go连接MySQL数据库并执行基本的增删改查吧... 目录Go语言连接mysql数据库准备工作安装 MySQL 驱动代码实现运行结果注意事项Go语言执行基本的增删改查准备工作

Go语言使用Gin处理路由参数和查询参数

《Go语言使用Gin处理路由参数和查询参数》在WebAPI开发中,处理路由参数(PathParameter)和查询参数(QueryParameter)是非常常见的需求,下面我们就来看看Go语言... 目录一、路由参数 vs 查询参数二、Gin 获取路由参数和查询参数三、示例代码四、运行与测试1. 测试编程路

MySQL 数据库表与查询操作实战案例

《MySQL数据库表与查询操作实战案例》本文将通过实际案例,详细介绍MySQL中数据库表的设计、数据插入以及常用的查询操作,帮助初学者快速上手,感兴趣的朋友跟随小编一起看看吧... 目录mysql 数据库表操作与查询实战案例项目一:产品相关数据库设计与创建一、数据库及表结构设计二、数据库与表的创建项目二:员