【Spark】v3 AQE 实例 - 动态切换 Join 策略

今天继续实验 AQE 的特性, 看看动态切换 Join 策略的实例.

运行

items 和 sales 表数据继续用前一篇博客创建的, 开启AQE, 并提交以下查询:

set spark.sql.adaptive.enabled = true;

SELECT s_date, sum(s_quantity * i_price) AS total_sales
FROM sales
JOIN items ON s_item_id = i_item_id
WHERE i_price < 10
GROUP BY s_date
ORDER BY total_sales DESC;

结果

从 Spark UI 上可以看到 Initial Plan 如下 , items 过滤 i_price < 10 后数据是很小的,
但是 static plan 并不知道, 使用的是 SortMergeJoin, 其实可以使用 broadcast join 替代.

+- == Initial Plan ==
Sort (38)
+- Exchange (37)
+- HashAggregate (36)
+- Exchange (35)
+- HashAggregate (34)
+- Project (33)
+- SortMergeJoin Inner (32)
:- Sort (28)
: +- Exchange (27)
: +- Filter (26)
: +- Scan parquet spark_catalog.hive_niko_test.sales (1)
+- Sort (31)
+- Exchange (30)
+- Filter (29)
+- Scan parquet spark_catalog.hive_niko_test.items (7)

不过开启 AQE 之后, 动态优化后的物理计划如下, 使用 BroadcastHashJoin 替代了.

== Physical Plan ==
AdaptiveSparkPlan (39)
+- == Final Plan ==
* Sort (25)
+- AQEShuffleRead (24)
+- ShuffleQueryStage (23), Statistics(sizeInBytes=8.4 KiB, rowCount=360)
+- Exchange (22)
+- * HashAggregate (21)
+- AQEShuffleRead (20)
+- ShuffleQueryStage (19), Statistics(sizeInBytes=438.8 KiB, rowCount=1.87E+4)
+- Exchange (18)
+- * HashAggregate (17)
+- * Project (16)
+- * BroadcastHashJoin Inner BuildRight (15)
:- AQEShuffleRead (6)
: +- ShuffleQueryStage (5), Statistics(sizeInBytes=29.8 GiB, rowCount=1.00E+9)
: +- Exchange (4)
: +- * Filter (3)
: +- * ColumnarToRow (2)
: +- Scan parquet spark_catalog.hive_niko_test.sales (1)
+- BroadcastQueryStage (14), Statistics(sizeInBytes=32.0 MiB, rowCount=3.00E+5)
+- BroadcastExchange (13)
+- AQEShuffleRead (12)
+- ShuffleQueryStage (11), Statistics(sizeInBytes=6.9 MiB, rowCount=3.00E+5)
+- Exchange (10)
+- * Filter (9)
+- * ColumnarToRow (8)
+- Scan parquet spark_catalog.hive_niko_test.items (7)

BroadcastHashJoin 如下图, 左侧是 sales, 右侧是 items,
右侧 BroadcastExchangedata size total 只有 6.9 MB:

备注(动态计划) :

(1) Scan parquet spark_catalog.hive_niko_test.sales
Output [3]: [s_item_id#12, s_quantity#13, s_date#14]
Batched: true
Location: InMemoryFileIndex [hdfs://dkn1:8020/user/hive/warehouse/hive_niko_test.db/sales]
PushedFilters: [IsNotNull(s_item_id)]
ReadSchema: struct<s_item_id:int,s_quantity:int,s_date:date>

(2) ColumnarToRow [codegen id : 1]
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]

(3) Filter [codegen id : 1]
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]
Condition : isnotnull(s_item_id#12)

(4) Exchange
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]
Arguments: hashpartitioning(cast(s_item_id#12 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=159]

(5) ShuffleQueryStage
Output [3]: [s_item_id#12, s_quantity#13, s_date#14]
Arguments: 0

(6) AQEShuffleRead
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]
Arguments: local

(7) Scan parquet spark_catalog.hive_niko_test.items
Output [2]: [i_item_id#15L, i_price#16]
Batched: true
Location: InMemoryFileIndex [hdfs://dkn1:8020/user/hive/warehouse/hive_niko_test.db/items]
PushedFilters: [IsNotNull(i_price), LessThan(i_price,10), IsNotNull(i_item_id)]
ReadSchema: struct<i_item_id:bigint,i_price:int>

(8) ColumnarToRow [codegen id : 2]
Input [2]: [i_item_id#15L, i_price#16]

(9) Filter [codegen id : 2]
Input [2]: [i_item_id#15L, i_price#16]
Condition : ((isnotnull(i_price#16) AND (i_price#16 < 10)) AND isnotnull(i_item_id#15L))

(10) Exchange
Input [2]: [i_item_id#15L, i_price#16]
Arguments: hashpartitioning(i_item_id#15L, 200), ENSURE_REQUIREMENTS, [plan_id=176]

(11) ShuffleQueryStage
Output [2]: [i_item_id#15L, i_price#16]
Arguments: 1

(12) AQEShuffleRead
Input [2]: [i_item_id#15L, i_price#16]
Arguments: local

(13) BroadcastExchange
Input [2]: [i_item_id#15L, i_price#16]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=235]

(14) BroadcastQueryStage
Output [2]: [i_item_id#15L, i_price#16]
Arguments: 2

(15) BroadcastHashJoin [codegen id : 3]
Left keys [1]: [cast(s_item_id#12 as bigint)]
Right keys [1]: [i_item_id#15L]
Join type: Inner
Join condition: None

(16) Project [codegen id : 3]
Output [3]: [s_quantity#13, s_date#14, i_price#16]
Input [5]: [s_item_id#12, s_quantity#13, s_date#14, i_item_id#15L, i_price#16]

(17) HashAggregate [codegen id : 3]
Input [3]: [s_quantity#13, s_date#14, i_price#16]
Keys [1]: [s_date#14]
Functions [1]: [partial_sum((s_quantity#13 * i_price#16))]
Aggregate Attributes [1]: [sum#37L]
Results [2]: [s_date#14, sum#38L]

(18) Exchange
Input [2]: [s_date#14, sum#38L]
Arguments: hashpartitioning(s_date#14, 200), ENSURE_REQUIREMENTS, [plan_id=317]

(19) ShuffleQueryStage
Output [2]: [s_date#14, sum#38L]
Arguments: 3

(20) AQEShuffleRead
Input [2]: [s_date#14, sum#38L]
Arguments: coalesced

(21) HashAggregate [codegen id : 4]
Input [2]: [s_date#14, sum#38L]
Keys [1]: [s_date#14]
Functions [1]: [sum((s_quantity#13 * i_price#16))]
Aggregate Attributes [1]: [sum((s_quantity#13 * i_price#16))#34L]
Results [2]: [s_date#14, sum((s_quantity#13 * i_price#16))#34L AS total_sales#33L]

(22) Exchange
Input [2]: [s_date#14, total_sales#33L]
Arguments: rangepartitioning(total_sales#33L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=350]

(23) ShuffleQueryStage
Output [2]: [s_date#14, total_sales#33L]
Arguments: 4

(24) AQEShuffleRead
Input [2]: [s_date#14, total_sales#33L]
Arguments: coalesced

(25) Sort [codegen id : 5]
Input [2]: [s_date#14, total_sales#33L]
Arguments: [total_sales#33L DESC NULLS LAST], true, 0

备注(静态计划) :

(26) Filter
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]
Condition : isnotnull(s_item_id#12)

(27) Exchange
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]
Arguments: hashpartitioning(cast(s_item_id#12 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=113]

(28) Sort
Input [3]: [s_item_id#12, s_quantity#13, s_date#14]
Arguments: [cast(s_item_id#12 as bigint) ASC NULLS FIRST], false, 0

(29) Filter
Input [2]: [i_item_id#15L, i_price#16]
Condition : ((isnotnull(i_price#16) AND (i_price#16 < 10)) AND isnotnull(i_item_id#15L))

(30) Exchange
Input [2]: [i_item_id#15L, i_price#16]
Arguments: hashpartitioning(i_item_id#15L, 200), ENSURE_REQUIREMENTS, [plan_id=114]

(31) Sort
Input [2]: [i_item_id#15L, i_price#16]
Arguments: [i_item_id#15L ASC NULLS FIRST], false, 0

(32) SortMergeJoin
Left keys [1]: [cast(s_item_id#12 as bigint)]
Right keys [1]: [i_item_id#15L]
Join type: Inner
Join condition: None

(33) Project
Output [3]: [s_quantity#13, s_date#14, i_price#16]
Input [5]: [s_item_id#12, s_quantity#13, s_date#14, i_item_id#15L, i_price#16]

(34) HashAggregate
Input [3]: [s_quantity#13, s_date#14, i_price#16]
Keys [1]: [s_date#14]
Functions [1]: [partial_sum((s_quantity#13 * i_price#16))]
Aggregate Attributes [1]: [sum#37L]
Results [2]: [s_date#14, sum#38L]

(35) Exchange
Input [2]: [s_date#14, sum#38L]
Arguments: hashpartitioning(s_date#14, 200), ENSURE_REQUIREMENTS, [plan_id=121]

(36) HashAggregate
Input [2]: [s_date#14, sum#38L]
Keys [1]: [s_date#14]
Functions [1]: [sum((s_quantity#13 * i_price#16))]
Aggregate Attributes [1]: [sum((s_quantity#13 * i_price#16))#34L]
Results [2]: [s_date#14, sum((s_quantity#13 * i_price#16))#34L AS total_sales#33L]

(37) Exchange
Input [2]: [s_date#14, total_sales#33L]
Arguments: rangepartitioning(total_sales#33L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=124]

(38) Sort
Input [2]: [s_date#14, total_sales#33L]
Arguments: [total_sales#33L DESC NULLS LAST], true, 0

(39) AdaptiveSparkPlan
Output [2]: [s_date#14, total_sales#33L]
Arguments: isFinalPlan=true

参考

https://docs.databricks.com/en/_extras/notebooks/source/aqe-demo.html