Spark SQL 打印 Plan 内容 (2) 剩余 Plan 和 CodeGen

前一篇从源码的测试用例打印了 Parsed Logical Plan,
更便捷的可以从 API 直接打印, 互相对比验证.

使用 API: org.apache.spark.sql.Dataset.explain(mode: String),
mode 内容如下 :

object ExplainMode {
/**
* Returns the explain mode from the given string.
*/
def fromString(mode: String): ExplainMode = mode.toLowerCase(Locale.ROOT) match {
case SimpleMode.name => SimpleMode
case ExtendedMode.name => ExtendedMode
case CodegenMode.name => CodegenMode
case CostMode.name => CostMode
case FormattedMode.name => FormattedMode
case _ => throw new IllegalArgumentException(s"Unknown explain mode: $mode. Accepted " +
"explain modes are 'simple', 'extended', 'codegen', 'cost', 'formatted'.")
}
}

直接打印 :

val df = sql("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE x = 1")
df.explain("extended")

结果 :

== Parsed Logical Plan ==
CTE [t]
: +- 'SubqueryAlias t
: +- 'UnresolvedSubqueryColumnAliases [x]
: +- 'Project [unresolvedalias(1, None)]
: +- OneRowRelation
+- 'Project [*]
+- 'Filter ('x = 1)
+- 'UnresolvedRelation [t], [], false

== Analyzed Logical Plan ==
x: int
WithCTE
:- CTERelationDef 0, false
: +- SubqueryAlias t
: +- Project [1#218 AS x#219]
: +- Project [1 AS 1#218]
: +- OneRowRelation
+- Project [x#219]
+- Filter (x#219 = 1)
+- SubqueryAlias t
+- CTERelationRef 0, true, [x#219]

== Optimized Logical Plan ==
Project [1 AS x#219]
+- OneRowRelation

== Physical Plan ==
*(1) Project [1 AS x#219]
+- *(1) Scan OneRowRelation[]

可以看到, 打印的 Parsed Logical Plan 和 前面测试用例的结果一致.

此外还能看到 Analyzed Logical Plan, Optimized Logical Plan, Physical Plan.
这些也可以从源码中查看对应的计划, 如下.

Analyzed Logical Plan

val analyzedPlan = df.queryExecution.analyzed
println(analyzedPlan)

结果:

WithCTE
:- CTERelationDef 0, false
: +- SubqueryAlias t
: +- Project [1#218 AS x#219]
: +- Project [1 AS 1#218]
: +- OneRowRelation
+- Project [x#219]
+- Filter (x#219 = 1)
+- SubqueryAlias t
+- CTERelationRef 0, true, [x#219]

Optimized Logical Plan

val optimizedPlan = df.queryExecution.optimizedPlan
println(optimizedPlan)

结果:

Project [1 AS x#219]
+- OneRowRelation

Physical Plan

val physicalPlan = df.queryExecution.sparkPlan
println(physicalPlan)

结果:

Project [1 AS x#219]
+- Scan OneRowRelation[]

查看生成的代码:

df.queryExecution.debug.codegen()

结果:

=== Codegen() ===

Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:55; maxConstantPoolSize:92(0.14% used); numInnerClasses:0) ==
*(1) Project [1 AS x#219]
+- *(1) Scan OneRowRelation[]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator rdd_input_0;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 011 */
/* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */ this.references = references;
/* 014 */ }
/* 015 */
/* 016 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 017 */ partitionIndex = index;
/* 018 */ this.inputs = inputs;
/* 019 */ rdd_input_0 = inputs[0];
/* 020 */ project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 021 */
/* 022 */ }
/* 023 */
/* 024 */ private void project_doConsume_0() throws java.io.IOException {
/* 025 */ // common sub-expressions
/* 026 */
/* 027 */ project_mutableStateArray_0[0].reset();
/* 028 */
/* 029 */ project_mutableStateArray_0[0].write(0, 1);
/* 030 */ append((project_mutableStateArray_0[0].getRow()));
/* 031 */
/* 032 */ }
/* 033 */
/* 034 */ protected void processNext() throws java.io.IOException {
/* 035 */ while ( rdd_input_0.hasNext()) {
/* 036 */ InternalRow rdd_row_0 = (InternalRow) rdd_input_0.next();
/* 037 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 038 */ project_doConsume_0();
/* 039 */ if (shouldStop()) return;
/* 040 */ }
/* 041 */ }
/* 042 */
/* 043 */ }