Spark SQL 的 AQE 机制

2023-12-14 21:40
文章标签 sql 机制 database spark aqe

本文主要是介绍Spark SQL 的 AQE 机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文

本文翻译自 Spark SQL AQE 机制的原始 JIRA 和官方设计文档 《New Adaptive Query Execution in Spark SQL》

背景

SPARK-9850 在 Spark 中提出了自适应执行的基本思想。

DAGScheduler中,添加了一个新的 API 来支持提交单个 Map Stage。

DAGScheduler请参考我的这篇博客——DAGScheduler 是什么?有什么作用?

目前 Spark SQL 中自适应执行的实现支持在运行时更改 Reducer 的数量。

如果一个阶段(Stage)需要从一个或多个阶段中获取 Shuffle 数据,则 Exchange Coordinator可以帮助它确定 Shuffle 后分区的数量。

在我们添加 Exchange 时,当前的实现增加了Exchange Coordinator

然而,有一些局限性。

首先,它可能会导致额外的 Shuffle,从而降低性能。

当它添加Exchange Coordinator时,我们可以从EnsureRequirements规则中看到这一点。

其次,在我们添加Exchange时添加Exchange Coordinator不是一个好主意,因为我们没有 Shuffle 后 阶段所有 Shuffle 依赖项的全局视图。

比如,对于单个阶段中 3 个表的 JOIN,应在三个Exchange中使用相同的Exchange Coordinator,但是目前会添加两个单独的Exchange Coordinator

第三,在当前的框架下,在自适应执行中灵活实现其他功能并不容易,例如更改执行计划和在运行时处理数据倾斜的 JOIN。

我们想介绍一种在 Spark SQL 中执行自适应执行的新方法,并解决其局限性。

这个想法在Adaptive Execution Enhancement in Spark SQL中进行了描述。

目标

我们的目标是实现一个灵活的框架来在 Spark SQL 中进行自适应执行,并支持在运行时更改 Reducer 的数量。

新的实现应该解决前面讨论的所有限制。

更改 JOIN 策略和处理数据倾斜的 JOIN 等其他功能将作为单独的规则实现,并在以后轻松插入。

规划

Spark 确定物理执行计划后,根据每个算子的定义生成一个 RDD 的 DAG。

关于 RDD 请参考我的博客——Spark RDD 论文详解(一)摘要和介绍

Spark 调度程序通过在 shuffle 边界处破坏 RDD 图来创建阶段(Stage)并提交阶段以供执行。

一旦确定了 SQL 执行计划,就无法在执行期间对其进行更新。

新的自适应执行的想法是基于 SQL 执行计划而不是 RDD 图来划分阶段。

我们将介绍称为 QueryStageQueryStageInput 的新节点。

在自适应执行模式下,一个执行计划被分成多个QueryStages

每个 QueryStage 都是在单个阶段中运行的子树。

QueryStageInputQueryStage 的叶节点,用来隐藏其子阶段。

它获取其子阶段的结果并将其作为 QueryStage 的输入。

QueryStage 通过收集 QueryStageInputs 了解其所有子阶段,因此它具有所有 shuffle 依赖项的全局视图。

我们添加了 QueryStageQueryStageInputs,从而在计划中查找 Exchange

下面是在一个阶段 中 JOIN 3 个表的示例。

在这里插入图片描述

我们将计划分为四个子树。

最后一个是具有三个 QueryStageInputsResult StageQueryStage4

QueryStageInput 是一个叶节点,但它指向一个子阶段,例如 QueryStageInput1 指向 QueryStage1

除了最后一个 QueryStageQueryStage 的子级始终是 ExchangeBroadcastExchangeExec

我们添加一个规则PlanQueryStage 来添加QueryStageQueryStageInput

仅当启用自适应执行时才会应用该规则。

在这里插入图片描述

执行和调度

我们从最后一个 QueryStage 开始执行。

在这个查询阶段执行计划之前,我们执行所有子阶段(Stage)并收集它们的输出统计信息。

线程池用于提交子阶段。

如果一个子阶段也有它的子阶段,它将首先提交自己的子阶段,这会递归地发生。

所以实际上 QueryStages 没有依赖关系将首先提交,其他 QueryStages 将等待其子阶段完成。

子阶段完成后,我们可以优化这个阶段的计划,根据子阶段的统计数据确定Reducer 的数量。

最后,我们为此查询阶段进行代码生成(CodeGen),并使用新计划更新 UI。

只要我们在 QueryStage 中优化计划时不添加任何 Exchange,就不会发生额外的 shuffle。

自动设置 Reducer 的数量

将使用三种配置来控制Reducer的数量。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。

spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。

这两种配置在 Spark 中已经存在。

我们添加了一个新的配置:spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

最终用户可以设置 Shuffle 后分区的最小和最大数量以及 Shuffle 后输入的目标大小。

在运行时,自适应执行会自动在 min 和 max 之间设置 reducer 的数量。

对于每个 QueryStage,我们使用以下方法在运行时自动设置分区的数量。

  1. 我们首先提交其所有子阶段,并收集 Map 输出统计信息。
  2. 我们创建一个exchange coordinator,将子阶段的 Map 输出统计信息传递给它,并调用estimatePartitionStartIndices方法来确定Shuffle后分区的数量。 (将来我们可能会删除类 ExchangeCoordinator,因为在更改之后只使用其中的一种方法)
  3. 每个子阶段获取相同的partitionStartIndex,并以此为基础构造一个新的ShuffledRowRDD。这些 ShuffledRowRDD 是当前阶段的输入 RDD。

在内部,我们使用最大数量(max)作为初始 shuffle 分区数量。

假设 max 配置为 5,min 配置为 1。

map 阶段完成后,我们知道每个分区的大小为 70MB、30MB、20MB、10MB 和 50MB。

如果我们将每个 reducer 的目标数据大小设置为 64MB,我们可以在运行时使用 3 个 reducer。

第一个 reducer 处理分区 0 (70MB)。

第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。

第三个 reducer 处理分区 4 (50MB)。

Spark SQL UI

执行计划可能会在运行时发生变化,因此 SQL UI 也应该反映这些变化。

在自适应执行模式下,SQL UI 会在开头显示原始的执行计划。

当自适应执行开始时,每个 QueryStage 都会提交子阶段,并且可能会更改其中的执行计划。

我们将发布一个事件 SparkListenerSQLAdaptiveExecutionUpdate(executionId, physicalPlanDescription, sparkPlanInfo) 来更新 UI 上的执行计划。

优化执行计划和处理倾斜 JOIN

通过上面讨论的更改,我们可以在运行时轻松优化 QueryStage 中的执行计划,即当我们发现一个表大小小于广播阈值时,将 SortMergeJoin 更改为 BroadcastHashJoin

我们还可以在执行子阶段后检测 JOIN 中的倾斜分区并自动处理。

这些策略可以作为单独的规则添加到自适应执行中并单独启用。

这篇关于Spark SQL 的 AQE 机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493979

相关文章

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

Mysql中设计数据表的过程解析

《Mysql中设计数据表的过程解析》数据库约束通过NOTNULL、UNIQUE、DEFAULT、主键和外键等规则保障数据完整性,自动校验数据,减少人工错误,提升数据一致性和业务逻辑严谨性,本文介绍My... 目录1.引言2.NOT NULL——制定某列不可以存储NULL值2.UNIQUE——保证某一列的每一

解密SQL查询语句执行的过程

《解密SQL查询语句执行的过程》文章讲解了SQL语句的执行流程,涵盖解析、优化、执行三个核心阶段,并介绍执行计划查看方法EXPLAIN,同时提出性能优化技巧如合理使用索引、避免SELECT*、JOIN... 目录1. SQL语句的基本结构2. SQL语句的执行过程3. SQL语句的执行计划4. 常见的性能优

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND