【Spark】v3 AQE 实例 - 动态合并分区

前面学习了 Spark 3 AQE 特性, 而在实际任务中是怎样的呢, 下面就来试试.

环境

建表:

CREATE DATABASE IF NOT EXISTS hive_niko_test;
USE hive_niko_test;

DROP TABLE IF EXISTS items;
DROP TABLE IF EXISTS sales;

-- Create "items" table.

CREATE TABLE items
USING parquet
AS
SELECT id AS i_item_id,
CAST(rand() * 1000 AS INT) AS i_price
FROM RANGE(30000000);

-- Create "sales" table with skew.
-- Item with id 100 is in 80% of all sales.

CREATE TABLE sales
USING parquet
AS
SELECT CASE WHEN rand() < 0.8 THEN 100 ELSE CAST(rand() * 30000000 AS INT) END AS s_item_id,
CAST(rand() * 100 AS INT) AS s_quantity,
DATE_ADD(current_date(), - CAST(rand() * 360 AS INT)) AS s_date
FROM RANGE(1000000000);

运行

设置参数, 开启 AQE:

set spark.sql.adaptive.enabled = true;
-- For demo purpose only.
-- Not necesary in real-life usage.
set spark.sql.adaptive.coalescePartitions.minPartitionNum = 1;

提交查询:

SELECT s_date, sum(s_quantity) AS q
FROM sales
GROUP BY s_date
ORDER BY q DESC;

结果

在 Spark UI 中, 可以看到 SQL 对应的计划:

== Physical Plan ==
AdaptiveSparkPlan (17)
+- == Final Plan ==
* Sort (11)
+- AQEShuffleRead (10)
+- ShuffleQueryStage (9), Statistics(sizeInBytes=8.4 KiB, rowCount=360)
+- Exchange (8)
+- * HashAggregate (7)
+- AQEShuffleRead (6)
+- ShuffleQueryStage (5), Statistics(sizeInBytes=219.4 KiB, rowCount=9.36E+3)
+- Exchange (4)
+- * HashAggregate (3)
+- * ColumnarToRow (2)
+- Scan parquet spark_catalog.hive_niko_test.sales (1)
+- == Initial Plan ==
Sort (16)
+- Exchange (15)
+- HashAggregate (14)
+- Exchange (13)
+- HashAggregate (12)
+- Scan parquet spark_catalog.hive_niko_test.sales (1)

先看 Exchange (4) 部分, 如下图:

可以看到优化后的 Plan: Exchange 后多了 AQEShuffleRead, 可以把分区合并成一个.
从 Exchange 的 shuffle bytes written total (min, med, max ) 可以看到 aggregation 之后的分区的平均值非常小,
从 AQEShuffleRead 的 number of partitionspartition data size 可以看到 AQE 把这些小分区合并到了一个分区.

备注 (不是重点) :
HashAggregate (3) 会进行 partial_sum(s_quantity#104)
Exchange (4) 是 hashpartitioning(s_date#105, 200)
Exchange (8) 是 rangepartitioning, 用的是 q#102L DESC NULLS LAST (sum(s_quantity#104)#106L AS q#102L) .

其他步骤的说明可以看下面的日志, 下一篇继续看 AQE 的其他优化特性.

(1) Scan parquet spark_catalog.hive_niko_test.sales
Output [2]: [s_quantity#104, s_date#105]
Batched: true
Location: InMemoryFileIndex [hdfs://dkn1:8020/user/hive/warehouse/hive_niko_test.db/sales]
ReadSchema: struct<s_quantity:int,s_date:date>

(2) ColumnarToRow [codegen id : 1]
Input [2]: [s_quantity#104, s_date#105]

(3) HashAggregate [codegen id : 1]
Input [2]: [s_quantity#104, s_date#105]
Keys [1]: [s_date#105]
Functions [1]: [partial_sum(s_quantity#104)]
Aggregate Attributes [1]: [sum#110L]
Results [2]: [s_date#105, sum#111L]

(4) Exchange
Input [2]: [s_date#105, sum#111L]
Arguments: hashpartitioning(s_date#105, 200), ENSURE_REQUIREMENTS, [plan_id=256]

(5) ShuffleQueryStage
Output [2]: [s_date#105, sum#111L]
Arguments: 0

(6) AQEShuffleRead
Input [2]: [s_date#105, sum#111L]
Arguments: coalesced

(7) HashAggregate [codegen id : 2]
Input [2]: [s_date#105, sum#111L]
Keys [1]: [s_date#105]
Functions [1]: [sum(s_quantity#104)]
Aggregate Attributes [1]: [sum(s_quantity#104)#106L]
Results [2]: [s_date#105, sum(s_quantity#104)#106L AS q#102L]

(8) Exchange
Input [2]: [s_date#105, q#102L]
Arguments: rangepartitioning(q#102L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=284]

(9) ShuffleQueryStage
Output [2]: [s_date#105, q#102L]
Arguments: 1

(10) AQEShuffleRead
Input [2]: [s_date#105, q#102L]
Arguments: coalesced

(11) Sort [codegen id : 3]
Input [2]: [s_date#105, q#102L]
Arguments: [q#102L DESC NULLS LAST], true, 0

(17) AdaptiveSparkPlan
Output [2]: [s_date#105, q#102L]
Arguments: isFinalPlan=true

Initial Plan:

(12) HashAggregate
Input [2]: [s_quantity#104, s_date#105]
Keys [1]: [s_date#105]
Functions [1]: [partial_sum(s_quantity#104)]
Aggregate Attributes [1]: [sum#110L]
Results [2]: [s_date#105, sum#111L]

(13) Exchange
Input [2]: [s_date#105, sum#111L]
Arguments: hashpartitioning(s_date#105, 200), ENSURE_REQUIREMENTS, [plan_id=235]

(14) HashAggregate
Input [2]: [s_date#105, sum#111L]
Keys [1]: [s_date#105]
Functions [1]: [sum(s_quantity#104)]
Aggregate Attributes [1]: [sum(s_quantity#104)#106L]
Results [2]: [s_date#105, sum(s_quantity#104)#106L AS q#102L]

(15) Exchange
Input [2]: [s_date#105, q#102L]
Arguments: rangepartitioning(q#102L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=238]

(16) Sort
Input [2]: [s_date#105, q#102L]
Arguments: [q#102L DESC NULLS LAST], true, 0

参考

https://docs.databricks.com/en/_extras/notebooks/source/aqe-demo.html