Spark SQL 的 AQE 机制

2023-12-14 21:40
文章标签 sql 机制 database spark aqe

本文主要是介绍Spark SQL 的 AQE 机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文

本文翻译自 Spark SQL AQE 机制的原始 JIRA 和官方设计文档 《New Adaptive Query Execution in Spark SQL》

背景

SPARK-9850 在 Spark 中提出了自适应执行的基本思想。

DAGScheduler中,添加了一个新的 API 来支持提交单个 Map Stage。

DAGScheduler请参考我的这篇博客——DAGScheduler 是什么?有什么作用?

目前 Spark SQL 中自适应执行的实现支持在运行时更改 Reducer 的数量。

如果一个阶段(Stage)需要从一个或多个阶段中获取 Shuffle 数据,则 Exchange Coordinator可以帮助它确定 Shuffle 后分区的数量。

在我们添加 Exchange 时,当前的实现增加了Exchange Coordinator

然而,有一些局限性。

首先,它可能会导致额外的 Shuffle,从而降低性能。

当它添加Exchange Coordinator时,我们可以从EnsureRequirements规则中看到这一点。

其次,在我们添加Exchange时添加Exchange Coordinator不是一个好主意,因为我们没有 Shuffle 后 阶段所有 Shuffle 依赖项的全局视图。

比如,对于单个阶段中 3 个表的 JOIN,应在三个Exchange中使用相同的Exchange Coordinator,但是目前会添加两个单独的Exchange Coordinator

第三,在当前的框架下,在自适应执行中灵活实现其他功能并不容易,例如更改执行计划和在运行时处理数据倾斜的 JOIN。

我们想介绍一种在 Spark SQL 中执行自适应执行的新方法,并解决其局限性。

这个想法在Adaptive Execution Enhancement in Spark SQL中进行了描述。

目标

我们的目标是实现一个灵活的框架来在 Spark SQL 中进行自适应执行,并支持在运行时更改 Reducer 的数量。

新的实现应该解决前面讨论的所有限制。

更改 JOIN 策略和处理数据倾斜的 JOIN 等其他功能将作为单独的规则实现,并在以后轻松插入。

规划

Spark 确定物理执行计划后,根据每个算子的定义生成一个 RDD 的 DAG。

关于 RDD 请参考我的博客——Spark RDD 论文详解(一)摘要和介绍

Spark 调度程序通过在 shuffle 边界处破坏 RDD 图来创建阶段(Stage)并提交阶段以供执行。

一旦确定了 SQL 执行计划,就无法在执行期间对其进行更新。

新的自适应执行的想法是基于 SQL 执行计划而不是 RDD 图来划分阶段。

我们将介绍称为 QueryStageQueryStageInput 的新节点。

在自适应执行模式下,一个执行计划被分成多个QueryStages

每个 QueryStage 都是在单个阶段中运行的子树。

QueryStageInputQueryStage 的叶节点,用来隐藏其子阶段。

它获取其子阶段的结果并将其作为 QueryStage 的输入。

QueryStage 通过收集 QueryStageInputs 了解其所有子阶段,因此它具有所有 shuffle 依赖项的全局视图。

我们添加了 QueryStageQueryStageInputs,从而在计划中查找 Exchange

下面是在一个阶段 中 JOIN 3 个表的示例。

在这里插入图片描述

我们将计划分为四个子树。

最后一个是具有三个 QueryStageInputsResult StageQueryStage4

QueryStageInput 是一个叶节点,但它指向一个子阶段,例如 QueryStageInput1 指向 QueryStage1

除了最后一个 QueryStageQueryStage 的子级始终是 ExchangeBroadcastExchangeExec

我们添加一个规则PlanQueryStage 来添加QueryStageQueryStageInput

仅当启用自适应执行时才会应用该规则。

在这里插入图片描述

执行和调度

我们从最后一个 QueryStage 开始执行。

在这个查询阶段执行计划之前,我们执行所有子阶段(Stage)并收集它们的输出统计信息。

线程池用于提交子阶段。

如果一个子阶段也有它的子阶段,它将首先提交自己的子阶段,这会递归地发生。

所以实际上 QueryStages 没有依赖关系将首先提交,其他 QueryStages 将等待其子阶段完成。

子阶段完成后,我们可以优化这个阶段的计划,根据子阶段的统计数据确定Reducer 的数量。

最后,我们为此查询阶段进行代码生成(CodeGen),并使用新计划更新 UI。

只要我们在 QueryStage 中优化计划时不添加任何 Exchange,就不会发生额外的 shuffle。

自动设置 Reducer 的数量

将使用三种配置来控制Reducer的数量。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。

spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。

这两种配置在 Spark 中已经存在。

我们添加了一个新的配置:spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

最终用户可以设置 Shuffle 后分区的最小和最大数量以及 Shuffle 后输入的目标大小。

在运行时,自适应执行会自动在 min 和 max 之间设置 reducer 的数量。

对于每个 QueryStage,我们使用以下方法在运行时自动设置分区的数量。

  1. 我们首先提交其所有子阶段,并收集 Map 输出统计信息。
  2. 我们创建一个exchange coordinator,将子阶段的 Map 输出统计信息传递给它,并调用estimatePartitionStartIndices方法来确定Shuffle后分区的数量。 (将来我们可能会删除类 ExchangeCoordinator,因为在更改之后只使用其中的一种方法)
  3. 每个子阶段获取相同的partitionStartIndex,并以此为基础构造一个新的ShuffledRowRDD。这些 ShuffledRowRDD 是当前阶段的输入 RDD。

在内部,我们使用最大数量(max)作为初始 shuffle 分区数量。

假设 max 配置为 5,min 配置为 1。

map 阶段完成后,我们知道每个分区的大小为 70MB、30MB、20MB、10MB 和 50MB。

如果我们将每个 reducer 的目标数据大小设置为 64MB,我们可以在运行时使用 3 个 reducer。

第一个 reducer 处理分区 0 (70MB)。

第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。

第三个 reducer 处理分区 4 (50MB)。

Spark SQL UI

执行计划可能会在运行时发生变化,因此 SQL UI 也应该反映这些变化。

在自适应执行模式下,SQL UI 会在开头显示原始的执行计划。

当自适应执行开始时,每个 QueryStage 都会提交子阶段,并且可能会更改其中的执行计划。

我们将发布一个事件 SparkListenerSQLAdaptiveExecutionUpdate(executionId, physicalPlanDescription, sparkPlanInfo) 来更新 UI 上的执行计划。

优化执行计划和处理倾斜 JOIN

通过上面讨论的更改,我们可以在运行时轻松优化 QueryStage 中的执行计划,即当我们发现一个表大小小于广播阈值时,将 SortMergeJoin 更改为 BroadcastHashJoin

我们还可以在执行子阶段后检测 JOIN 中的倾斜分区并自动处理。

这些策略可以作为单独的规则添加到自适应执行中并单独启用。

这篇关于Spark SQL 的 AQE 机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493979

相关文章

详解MySQL中DISTINCT去重的核心注意事项

《详解MySQL中DISTINCT去重的核心注意事项》为了实现查询不重复的数据,MySQL提供了DISTINCT关键字,它的主要作用就是对数据表中一个或多个字段重复的数据进行过滤,只返回其中的一条数据... 目录DISTINCT 六大注意事项1. 作用范围:所有 SELECT 字段2. NULL 值的特殊处

MySQL 用户创建与授权最佳实践

《MySQL用户创建与授权最佳实践》在MySQL中,用户管理和权限控制是数据库安全的重要组成部分,下面详细介绍如何在MySQL中创建用户并授予适当的权限,感兴趣的朋友跟随小编一起看看吧... 目录mysql 用户创建与授权详解一、MySQL用户管理基础1. 用户账户组成2. 查看现有用户二、创建用户1. 基

MySQL 打开binlog日志的方法及注意事项

《MySQL打开binlog日志的方法及注意事项》本文给大家介绍MySQL打开binlog日志的方法及注意事项,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、默认状态二、如何检查 binlog 状态三、如何开启 binlog3.1 临时开启(重启后失效)

SQL BETWEEN 语句的基本用法详解

《SQLBETWEEN语句的基本用法详解》SQLBETWEEN语句是一个用于在SQL查询中指定查询条件的重要工具,它允许用户指定一个范围,用于筛选符合特定条件的记录,本文将详细介绍BETWEEN语... 目录概述BETWEEN 语句的基本用法BETWEEN 语句的示例示例 1:查询年龄在 20 到 30 岁

MySQL DQL从入门到精通

《MySQLDQL从入门到精通》通过DQL,我们可以从数据库中检索出所需的数据,进行各种复杂的数据分析和处理,本文将深入探讨MySQLDQL的各个方面,帮助你全面掌握这一重要技能,感兴趣的朋友跟随小... 目录一、DQL 基础:SELECT 语句入门二、数据过滤:WHERE 子句的使用三、结果排序:ORDE

MySQL MCP 服务器安装配置最佳实践

《MySQLMCP服务器安装配置最佳实践》本文介绍MySQLMCP服务器的安装配置方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下... 目录mysql MCP 服务器安装配置指南简介功能特点安装方法数据库配置使用MCP Inspector进行调试开发指

mysql中insert into的基本用法和一些示例

《mysql中insertinto的基本用法和一些示例》INSERTINTO用于向MySQL表插入新行,支持单行/多行及部分列插入,下面给大家介绍mysql中insertinto的基本用法和一些示例... 目录基本语法插入单行数据插入多行数据插入部分列的数据插入默认值注意事项在mysql中,INSERT I

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名