前面学习了 Spark 3 AQE 特性, 而在实际任务中是怎样的呢, 下面就来试试.
环境
建表:
CREATE DATABASE IF NOT EXISTS hive_niko_test; |
运行
设置参数, 开启 AQE:
set spark.sql.adaptive.enabled = true; |
提交查询:
SELECT s_date, sum(s_quantity) AS q |
结果
在 Spark UI 中, 可以看到 SQL 对应的计划:
== Physical Plan == |
先看 Exchange (4)
部分, 如下图:
可以看到优化后的 Plan: Exchange 后多了 AQEShuffleRead, 可以把分区合并成一个.
从 Exchange 的 shuffle bytes written total (min, med, max )
可以看到 aggregation 之后的分区的平均值非常小,
从 AQEShuffleRead 的 number of partitions
和 partition data size
可以看到 AQE 把这些小分区合并到了一个分区.
备注 (不是重点) :
HashAggregate (3) 会进行 partial_sum(s_quantity#104)
Exchange (4) 是 hashpartitioning(s_date#105, 200)
Exchange (8) 是 rangepartitioning
, 用的是 q#102L DESC NULLS LAST
(sum(s_quantity#104)#106L AS q#102L
) .
其他步骤的说明可以看下面的日志, 下一篇继续看 AQE 的其他优化特性.
(1) Scan parquet spark_catalog.hive_niko_test.sales |
Initial Plan:
(12) HashAggregate |
参考
https://docs.databricks.com/en/_extras/notebooks/source/aqe-demo.html