【Spark】v3 - Adaptive Query Execution

之前工作中 Spark 2 用的较多, 但新版本的 Spark 3 加入了 AQE 优化
(全称 Adaptive Query Execution, 自适应查询执行), 这个特性还是很实用的.

在以前, cost-based optimization (CBO) 是 Spark SQL 优化的重要特性之一,
它通过收集和利用统计数据 (例如 row count, number of distinct values, NULL values, max/min values, etc.)
来帮助 Spark 生成高质量的执行计划.
比如用来选择正确的 join type (broadcast hash join vs. sort merge join), 选择 hash-join 的 build side,
以及调整 multi-way join 的 join order.
但是, 这些可能过时的统计数据和不准确的基数估算可能生成不理想的执行计划.
因此, Spark 3 引入了 AQE, 将基于 query 执行中收集到的 runtime statistics 进行 reoptimize, 并调整执行计划.

when to reoptimize

引入AQE, 首先面临的一个重要问题就是 什么时候进行 reoptimize.
我们知道, Spark 的 parallel pipeline 并行处理 会被 shuffle 或 broadcast exchange breaks 打断,
这里可以称为 materialization points (物化点), 被物化点切割后的片段称为 query stages,
每个 query stage 会物化它产出的中间结果, 其下游的 Stage 只有它依赖的 Stages 执行完了, 才能被运行,
因此当这里所有 partitions 的统计数据已经可用, 但下游的处理还未开始时, 可以进行 reoptimize.

1, 当查询开始时, AQE 首先启动那些 leaf stages (没有依赖其他 stages 的 stages),
这些 stages 一旦有一个或多个完成, AQE 就会在 physical plan 把它们标记为 complete,
并根据 completed stages 的 runtime statistics 统计数据, 相应地更新 logical query plan.
2, 基于这些新的 statistics, AQE 将运行 optimizer (with a selected list of logical optimization rules) 以及
physical planner 以及 physical optimization rules (包括 regular physical rules 和 adaptive-execution-specific rules, 比如分区合并、数据倾斜处理等等).
3, 现在, 基于 completed stages, 我们已经获得了一个新的优化过的 query plan, AQE 框架将搜索新的 stages (那些 child stages 已经物化了的 stages),
然后重复上面的 execute-reoptimize-execute 流程, 知道整个 query 完成.

三大特性

AQE 有三大特性:

  • 动态合并 shuffle partitions (Dynamically coalescing shuffle partitions)
  • 动态切换 join strategies (Dynamically switching join strategies)
  • 动态优化 数据倾斜的 join (Dynamically optimizing skew joins)

1. 动态合并 shuffle partitions

Spark 处理海量数据时, shuffle 通常是比较影响性能的.
而 shuffle 的一个重要属性是 partitions 的数目, 而最佳数目是和数据相关的,
数据大小在不同的 query 和 stage 之间可能会有很大差异, 所以很难去设置一个数值.

1, 如果设置太小, 单个 partition 数据量就会过大, 导致大量数据落盘, 拖慢查询.
2, 如果设置太大, 单个 partition 数据量就会过小, 导致大量的网络小数据请求和IO, 拖慢查询,
大量的 tasks 将会加大 Spark task scheduler 的负担.

为了解决这个问题, 我们可以一开始设置一个相对较大的 shuffle partitions 数目, 然后合并相邻的小 partitions.

假设我们运行 SELECT max(i)FROM tbl GROUP BY j, 输入数据 tbl 数据量比较小, 而且只有两个分区,
如果初始 partitions 数目设置为 5 , 那么 grouped 数据将会 shuffle 到 5 个分区.
如果没有 AQE, Spark 会启动 5 个 tasks 来做 final aggregation, 其中 3 个可能数据量很小,
为它们每一个启动独立的 tasks 来处理, 将会是巨大的浪费.

如果启用 AQE, 这 3 个小 partitions 将会进行合并, 而且只会启动一个 task 来做合并后的分区的 final aggregation.

2. 动态切换 join strategies

如果 join 的某一侧数据能够放进内存, 那么 broadcast hash join 是 Spark join 策略中效果最好的.
因此如果一个表预估小于 broadcast-size 阈值, 那么应该选择 broadcast hash join 策略.
但是, 在过滤性很强的 filter 或者 一系列复杂的算子之后, 这个估算是很容易出错.
为了解决这个问题, AQE 可以基于精确的上游统计数据重新计划 join strategy.

比如下面的例子, 红色 join 那侧的数据估计是有 15MB, 但是实际只有 8MB, 小于阈值,
因此经过 AQE 优化后, sort merge join 被转为 broadcast hash join.

所以 regular shuffle 可以优化为 localized shuffle (在每个 mapper 上读,
而不是在每个 reducer 上), 从而减少网络传输和开销.

3. 动态优化 数据倾斜的 join

数据倾斜是集群中分区的数据没有均衡分布, 严重的数据倾斜会显著地降低查询性能, 尤其是 join.
AQE 基于 shuffle file 的数据统计, 能自动检测这些倾斜,
然后将倾斜的 partitions 分裂成更小的 subpartitions, 分别和另一侧的对应分区进行 join.

比如, table A join table B, 表 A 的 A0 分区显著大于其他分区.

如果没有 AQE, sort merge join 将会有 4 个 tasks, 其中一个将会花费明显更长的时间.

AQE 自动优化后, 将会有 5 个 tasks, 每个 task 将会花费差不多的时间, 发挥整体更好的性能.

启用

AQE 的开启可以通过设置spark.sql.adaptive.enabledtrue来开启,
更多设置可以参考文档: https://spark.apache.org/docs/latest/sql-performance-tuning.html

参考


https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
https://docs.databricks.com/en/_extras/notebooks/source/aqe-demo.html