基于 Spark 4.2,分支
branch-4.2阅读时长约 10分钟 · 入门到中级
引子
写下这一行, 你已经写过几百次的代码:
spark.sql("SELECT name FROM users WHERE age > 18").show()这句话背后,Spark 究竟做了哪些事?
如果你只能说出"解析、优化、生成 RDD",那么这篇文章想带你逐行走完这趟旅程——看清每个阶段的入口在哪里、产物是什么、什么时候发生、为什么这样设计。
读完之后,下次再看到一份执行计划,你应该能反过来推断它经过了哪些规则。
一、流程图:九个阶段,一根流水线
用户代码│▼┌──────────────────── Driver 端,单线程 ────────────────────┐│ ││ 1. SparkSession.sql ← 入口 ││ 2. Parser 字符串 → Unresolved LogicalPlan││ 3. Dataset.ofRows 创建 QueryExecution ││ 4. Analyzer 绑定 schema / 解析名字 ││ 5. Command 急执行 (只 DDL/DML 走这步) ││ 6. Cache 替换 + 规范化 命中 cache 子树替换 ││ 7. Optimizer 等价变换,求最优逻辑计划 ││ 8. Planner LogicalPlan → SparkPlan ││ 9. Prepare for Execution 插 Exchange/Sort/Codegen 等 ││ │└────────────────────────────────────────────────────────────┘│▼ executedPlan.execute()┌──────────────────── 分布式执行 ──────────────────────────┐│ RDD → DAGScheduler → TaskScheduler → Executor → Task │└──────────────────────────────────────────────────────────┘
两个最容易踩坑的认知:
前 8 步全部在 Driver 单线程跑。SQL "慢"如果慢在编译期,加 executor 数量不会有任何帮助。
DataFrame 并非完全 lazy。
spark.sql(...)返回的瞬间 Analyzer 已经跑完(要知道 schema 才能给你返回 DataFrame);Optimizer / Planner 才真正等到 action 触发。
二、入口:SparkSession.sql
打开 SparkSession.scala:563:
def sql(sqlText: String, args: Array[_]): DataFrame = {sql(sqlText, args, new QueryPlanningTracker)}
每次调用都新建一个 QueryPlanningTracker。这个小对象贯穿整条路径,每个阶段都用它打卡。后面你能看到为什么这件事很重要。
真正干活的重载在 SparkSession.scala:528:
private[sql] def sql(sqlText: String, args: Array[_], tracker: QueryPlanningTracker): DataFrame = withActive {val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {val parsed = sessionState.sqlParser.parsePlanWithParameters(sqlText, paramContext)// ... 参数绑定处理 ...parsed}Dataset.ofRows(self, plan, tracker)}
到这里就两件事:
measurePhase("parsing") { ... }包住 Parser,记录耗时把解析出来的
LogicalPlan交给Dataset.ofRows
干净利落。
三、Parser:字符串变成树
Parser 的本职是 ANTLR4 工作。入口在 SparkSqlParser.scala:85,再往下就是 parsers.scala:59:
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))// ...val tokenStream = new CommonTokenStream(lexer)val parser = new SqlBaseParser(tokenStream)// ...}
注意 UpperCaseCharStream——这就是 Spark SQL 关键字大小写不敏感的实现方式:lexer 看到的字符流统一是大写,但原始 token 文本保留。
从 AST 到 LogicalPlan 走访问者模式,由 SparkSqlAstBuilder(继承自 AstBuilder)完成。
代码::
val parsed = spark.sessionState.sqlParser.parsePlan("SELECT name FROM users WHERE age > 18")println(parsed)
输出:
'Project ['name]
+- 'Filter ('age > 18)
+- 'UnresolvedRelation [users], [], false
注意:表不存在不会在这里报错。所有以 Unresolved 开头的节点都意味着"还没问过 catalog"。这是 Parser 的边界。
四、QueryExecution:阶段流水线的容器
接力棒在 SparkSession.scala:559 传给 Dataset.ofRows,看 Dataset.scala:111:
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =sparkSession.withActive {val qe = sparkSession.sessionState.executePlan(logicalPlan)if (!qe.isLazyAnalysis) qe.assertAnalyzed()new Dataset[Row](qe, () => RowEncoder.encoderFor(qe.analyzed.schema))}
两件事:
创建一个
QueryExecution,把未解析的逻辑计划塞进去立即调用
assertAnalyzed()——也就是上面说的"DataFrame 创建时 Analyzer 已经跑了"
为什么必须现在跑?因为接下来 RowEncoder.encoderFor(qe.analyzed.schema)(Dataset.scala:115)需要 schema,而 schema 只有在 Analyze 之后才有。
QueryExecution 本身是什么?它是后续所有阶段的懒加载容器。打开 QueryExecution.scala,看行192-394 那一长串你会发现一个清晰的模式:
private val lazyAnalyzed = LazyTry { ... } // 192private val lazyCommandExecuted = LazyTry { ... } // 215private val lazyNormalized = LazyTry { ... } // 280private val lazyWithCachedData = LazyTry { ... } // 289private val lazyOptimizedPlan = LazyTry { ... } // 313private val lazySparkPlan = LazyTry { ... } // 337private val lazyExecutedPlan = LazyTry { ... } // 355val lazyToRdd = LazyTry { ... } // 378
每一阶段对应一个 LazyTry,外面包一层 def:
def analyzed: LogicalPlan = lazyAnalyzed.getdef optimizedPlan: LogicalPlan = lazyOptimizedPlan.getdef sparkPlan: SparkPlan = lazySparkPlan.getdef executedPlan: SparkPlan = lazyExecutedPlan.getdef toRdd: RDD[InternalRow] = lazyToRdd.get
调试 SQL 编译过程的所有套路都建立在这上面:
df.queryExecution.analyzed // 看 Analyzer 产物df.queryExecution.optimizedPlan // 看 Optimizer 产物df.queryExecution.sparkPlan // 看 Planner 产物df.queryExecution.executedPlan // 看 Prepare 后的最终物理计划df.queryExecution.tracker.phases // 看每个阶段耗时
记住这五行。看任何 Spark SQL 性能问题,没有不是从这里入手的。
五、Analyzer:把名字绑定到东西
QueryExecution.scala:192:
private val lazyAnalyzed = LazyTry {val plan = executePhase(QueryPlanningTracker.ANALYSIS) {analyzer.executeAndCheck(sqlScriptExecuted, tracker)}tracker.setAnalyzed(plan)plan}
具体到 Analyzer.scala:331:
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {if (plan.analyzed) {plan} else {def runAnalysis(): LogicalPlan =HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan)// ...}}
注意:4.x 引入了
HybridAnalyzer,老文章里直接讲的Analyzer extends RuleExecutor这条单链已经不准确。本文不展开 Hybrid 机制,只把它当成"Analyzer 的入口"。
Analyzer 的工作可以这样概括:一组规则按 fixed-point(不动点)反复跑,直到计划不再变化。每条规则负责消除一种 Unresolved:
| Unresolved | 谁来解析 | 解析成什么 |
|---|---|---|
UnresolvedRelation | ResolveRelations | 去 catalog 查表,变成 LocalRelation / HiveTableRelation / DataSourceV2Relation 等 |
UnresolvedAttribute | ResolveReferences | 列名变成有 ExprId 的 AttributeReference |
UnresolvedFunction | ResolveFunctions | 函数名变成具体的 Expression 实现 |
代码:
spark.range(100).toDF("id").createOrReplaceTempView("t")val df = spark.sql("SELECT id FROM t WHERE id > 10")println("=== Parsed ===")println(df.queryExecution.logical) // 未解析println("=== Analyzed ===")println(df.queryExecution.analyzed) // 已解析
输出:
=== Parsed ===
'Project ['id]
+- 'Filter ('id > 10)
+- 'UnresolvedRelation [t], [], false
=== Analyzed ===
Project [id#1L]
+- Filter (id#1L > cast(10 as bigint))
+- SubqueryAlias t
+- View (`t`, [id#1L])
+- Project [id#0L AS id#1L]
+- Range (0, 100, step=1, splits=Some(1))
注意几个变化:
'id(未解析的 Attribute)变成id#0L(已绑定 ExprId,类型是 long)UnresolvedRelation被替换成具体的SubqueryAlias + Project + Range10被自动cast(10 as bigint)——类型隐式转换也在 Analyzer 完成
如果有报错(找不到表、列名歧义、类型不匹配),抛出的 AnalysisException 也来自这一步。
六、Optimizer:等价变换求最优
QueryExecution.scala:313:
private val lazyOptimizedPlan = LazyTry {assertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)plan.setAnalyzed()plan}}
Optimizer 也是 RuleExecutor,但规则更多更狠。它的工作精神可以一句话概括:结果集等价的前提下,找一棵开销最小的计划树。
最经典的几条:
| 规则 | 干什么 |
|---|---|
ConstantFolding | 1 + 1 → 2、1 = 1 → true |
BooleanSimplification | true AND x > 5 → x > 5 |
PushDownPredicates | 把 WHERE 推到 scan 节点附近,减少数据量 |
ColumnPruning | 只读取真正用到的列 |
CollapseProject | 合并相邻的 Project |
LimitPushDown | 把 LIMIT 下推到 union / join 各分支 |
代码:
val df = spark.sql("""SELECT id FROM tWHERE 1 = 1 AND id > 5 AND id IS NOT NULL""")println("=== Analyzed ===")println(df.queryExecution.analyzed)println("=== Optimized ===")println(df.queryExecution.optimizedPlan)
输出:
=== Analyzed ===
Project [id#13]
+- Filter (((1 = 1) AND (id#13 > 5)) AND isnotnull(id#13))
+- SubqueryAlias users
+- View (`users`, [id#13, name#14, age#15])
+- Project [_1#3 AS id#13, _2#4 AS name#14, _3#5 AS age#15]
+- LocalRelation [_1#3, _2#4, _3#5]
=== Optimized ===
Project [_1#3 AS id#13]
+- Filter (_1#3 > 5)
+- LocalRelation [_1#3, _2#4, _3#5]
1 = 1被ConstantFolding折叠true AND ...被BooleanSimplification消去id > 5已经隐含了id IS NOT NULL,多余的IS NOT NULL被BooleanSimplification/InferFiltersFromConstraints处理
关键提醒:Optimizer 的所有变换必须结果集等价。如果你看到 Optimizer 把你的 SQL 改成了一个"明显不一样"的形式但结果正确,不要慌——它就是干这个的。
七、Planner:从逻辑到物理
QueryExecution.scala:337:
private val lazySparkPlan = LazyTry {assertOptimized()val plan = executePhase(QueryPlanningTracker.PLANNING) {QueryExecution.createSparkPlan(planner, optimizedPlan.clone())}attachTransaction(plan)}
createSparkPlan 的实现非常诚实,只有一行有效代码(QueryExecution.scala:808):
def createSparkPlan(planner: SparkPlanner, plan: LogicalPlan): SparkPlan = {// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,// but we will implement to choose the best plan.planner.plan(ReturnAnswer(plan)).next()}
源码里那条 TODO 揭示了一个真相:Spark 的 planner 理论上能产出多个候选物理计划(Iterator[SparkPlan]),但当前实现就是粗暴地 .next() 取第一个。所谓"基于代价的优化"(CBO)主要发生在 Optimizer 阶段,而不是这里。
Planner 的工作机制是 Strategy 模式。看 SparkPlanner.scala:38:
override def strategies: Seq[Strategy] =experimentalMethods.extraStrategies ++extraPlanningStrategies ++ (LogicalQueryStageStrategy ::PythonEvals ::new DataSourceV2Strategy(session) ::V2CommandStrategy ::FileSourceStrategy ::DataSourceStrategy ::SpecialLimits ::Aggregation ::Window ::WindowGroupLimit ::JoinSelection :: // ← BHJ / SMJ / SHJ 在这里决定InMemoryScans ::SparkScripts ::Pipelines ::BasicOperators ::EventTimeWatermarkStrategy :: Nil)
每个 Strategy 都是一个偏函数:匹配某种 LogicalPlan 形态,吐出对应的 SparkPlan。比如:
FileSourceStrategy把LogicalRelation(HadoopFsRelation)翻译成FileSourceScanExecJoinSelection看 join 两边的 size 和 hint,决定生成BroadcastHashJoinExec/SortMergeJoinExec/ShuffledHashJoinExecAggregation选择HashAggregateExec还是SortAggregateExec
Strategy 顺序很重要:靠前的 Strategy 优先匹配。所以 LogicalQueryStageStrategy(AQE 的)排在最前面。
八、Prepare for Execution:最后一公里
走到这里你已经有一棵 SparkPlan 了,但不能直接执行——它还缺:
shuffle / sort(如果上游没满足下游的分区或排序要求)
WholeStageCodegen 包装(合并能融合的算子)
AQE 包装(如果启用)
subquery 物化
……
这些都在 prepareForExecution 里加。看 QueryExecution.scala:752:
private[execution] def preparations(sparkSession: SparkSession,adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,subquery: Boolean): Seq[Rule[SparkPlan]] = {adaptiveExecutionRule.toSeq ++Seq(CoalesceBucketsInJoin,PlanDynamicPruningFilters(sparkSession),PlanSubqueries(sparkSession),RemoveRedundantProjects,EnsureRequirements(),InsertSortForLimitAndOffset,ReplaceHashWithSortAgg,RemoveRedundantSorts,RemoveRedundantWindowGroupLimits,DisableUnnecessaryBucketedScan,ApplyColumnarRulesAndInsertTransitions(...),CollapseCodegenStages()) ++ (if (subquery) Nil else Seq(ReuseExchangeAndSubquery))}
要记住的就两条:
EnsureRequirements是 Exchange 的来源。所有的 shuffle 节点都不是 Planner 生成的,是这一步根据requiredChildDistribution推断出来的。CollapseCodegenStages是 WholeStageCodegen 的来源。看到*(N) HashAggregate这种带星号的算子,就是被它打包了。
关于 AQE 的提醒:如果 AQE 启用,
InsertAdaptiveSparkPlan会把整个计划包进AdaptiveSparkPlanExec——一个 leaf node。后面所有规则因此都变成 no-op(leaf node 没有子树可改)。真正的 prepare 推迟到 AQE 内部按 stage 逐段做。
九、Execute:进入分布式世界
QueryExecution.scala:378:
val lazyToRdd = LazyTry {new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)}
executedPlan.execute() 触发整棵物理计划的递归 doExecute() 调用,从叶子(scan)到根,最终构建出一棵 RDD 血缘。
到这里 Catalyst 的工作就完成了。调用 df.show() / df.collect() / df.count() 等 action 时,才会触发 SparkContext.runJob(rdd, ...),由 DAGScheduler 把 RDD 切成 Stage 和 Task 推到 Executor。
那是另一篇文章的故事(系列文章 #2)。
十、实战:把九个阶段一次看个够
把所有工具组合起来:
spark.range(100).toDF("id").createOrReplaceTempView("t")val sql = "SELECT id FROM t WHERE id > 10 AND 1 = 1"val df = spark.sql(sql)println("\n=== 1. Parser 单独跑 ===")println(spark.sessionState.sqlParser.parsePlan(sql))println("\n=== 2. Analyzed ===")println(df.queryExecution.analyzed)println("\n=== 3. Optimized ===")println(df.queryExecution.optimizedPlan)println("\n=== 4. SparkPlan (未 prepare) ===")println(df.queryExecution.sparkPlan)println("\n=== 5. ExecutedPlan (已 prepare) ===")println(df.queryExecution.executedPlan)// 触发执行,让 tracker 数据完整df.collect()println("\n=== 6. 各阶段耗时 ===")df.queryExecution.tracker.phases.foreach { case (name, summary) =>println(f"$name%-20s ${summary.endTimeMs - summary.startTimeMs}%5d ms")}
把这段在你本地的 Spark 4.x 上跑一遍,对照前面九节的描述,你会看到:
第 1 步出
UnresolvedRelation、UnresolvedAttribute第 2 步所有
Unresolved消失,类型补齐第 3 步
1 = 1折叠掉第 4 步出现具体物理算子(
*Range、*Filter、*Project)第 5 步可能多了
WholeStageCodegen包装第 6 步打印出
parsing/analysis/optimization/planning四个阶段的耗时
十一、总结
回顾:
spark.sql(sqlText)└─ SparkSqlParser.parsePlan → Unresolved LogicalPlan└─ Dataset.ofRows└─ QueryExecution (lazy 容器)├─ analyzer.executeAndCheck → Analyzed LogicalPlan├─ cacheManager.useCachedData → Normalized├─ optimizer.executeAndTrack → Optimized LogicalPlan├─ planner.plan(...).next() → SparkPlan├─ prepareForExecution → ExecutedPlan└─ executedPlan.execute() → RDD[InternalRow]│▼action → DAGScheduler → ...
三个最值得带走的认知:
Spark SQL 的"编译期"很长。前 8 步全部串行跑在 Driver 上。如果你的 SQL 已经在
df.queryExecution.executedPlan卡了好几秒,加机器没用——需要看 Optimizer / Planner 是哪条规则慢。DataFrame 的 lazy 不彻底。Analyzer 在
spark.sql(...)返回前就跑完了。表不存在、列名错这类错误立刻抛;但 Optimizer/Planner 要等 action。df.queryExecution.{analyzed, optimizedPlan, sparkPlan, executedPlan, tracker}是性能调优的瑞士军刀。任何 Spark SQL 性能问题都从这里下手。
每天花费10分钟学习spark,让你技术之路走得更稳、更快。
喜欢的点个关注。
夜雨聆风