Claude Code源码系列:2、核心业务逻辑-Agent运行主流程
想象这样一个场景:你在终端里输入一条命令,让 Claude帮你”修复那个bug”。几秒钟后,它开始读取文件、运行测试、修改代码、提交变更——一整套复杂操作链条自动完成。这背后发生了什么?是什么机制驱动了这个看似”智能”的自动化流程?
答案藏在一个精心设计的 Agent 运行主流程 中。这是一个基于状态机的无限循环系统,融合了流式 API 调用、并发工具执行、动态上下文管理、子Agent隔离等多个复杂子系统。本文将带你深入拆解这个系统的每一层设计。
目录 (Table of Contents)
-
架构概览 (Architecture Overview) -
核心主流程拆解 (Core Flow / Lifecycle) -
关键机制与底层设计 (Deep Dive into Mechanisms) -
状态管理与数据流转 (State & Data Management) -
高级模式与实战分析 (Advanced Patterns) -
总结 (Summary)
1. 架构概览 (Architecture Overview)
1.1 系统全景图
Claude Code 的 Agent 系统采用分层架构设计,从 CLI 入口到工具执行形成清晰的调用链。以下是完整的架构全景图:
┌───────────────────────────────────────────────────────────────────────┐│ CLI Entry (cli.tsx) ││ [Bootstrap & Fast Paths] ││ ↓ ││ Main Application (main.tsx) ││ [Commander.js CLI, Auth, Session Init] ││ ↓ ││ ┌─────────────────────────────────────────────────────────────────────┐ ││ │ REPL Screen (REPL.tsx) │ ││ │ ┌────────────────────┐ ┌───────────────────────────────────┐ │ ││ │ │ User Input │───→│ query() Loop │ │ ││ │ │ (PromptInput) │ │ ┌───────────────────────────┐ │ │ ││ │ │ │ │ │ State Machine (while) │ │ │ ││ │ │ - Text Input │ │ │ ┌───────────────────┐ │ │ │ ││ │ │ - Slash Commands │ │ │ │ 1. Context Prep │ │ │ │ ││ │ │ - Image Paste │ │ │ │ 2. Model Call │ │ │ │ ││ │ │ - File Attach │ │ │ │ 3. ToolUse Detect │ │ │ │ ││ │ └────────────────────┘ │ │ │ 4. Tool Execute │ │ │ │ ││ │ │ │ │ 5. Stop Hooks │ │ │ │ ││ │ │ │ │ 6. Continue/Exit │ │ │ │ ││ │ │ │ └───────────────────┘ │ │ │ ││ │ │ └───────────────────────────┘ │ │ ││ │ └───────────────────────────────────┘ │ ││ │ │ ││ │ ┌────────────────────┐ ┌───────────────────────────────────┐ │ ││ │ │ AppState Store │←──→│ Tool Permission Context │ │ ││ │ │ (React State) │ │ (Mode, Rules, Allow/Deny) │ │ ││ │ └────────────────────┘ └───────────────────────────────────┘ │ ││ └─────────────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────────────────────────────────────────────────────────┐ ││ │ StreamingToolExecutor (Concurrent Execution) │ ││ │ │ ││ │ ┌──────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ ││ │ │ Tool Queue │ │ Concurrency │ │ Progress │ │ ││ │ │ Management │ │ Controller │ │ Stream │ │ ││ │ │ │ │ │ │ │ │ ││ │ │ - TrackedTool[] │ │ - Safe: Parallel│ │ - yield msg │ │ ││ │ │ - Status Track │ │ - NonSafe: Serial│ │ - immediate │ │ ││ │ │ - Promise Track │ │ - Bash Error │ │ - pendingQueue │ │ ││ │ └──────────────────┘ └──────────────────┘ └─────────────────┘ │ ││ │ │ ││ │ ┌──────────────────────────────────────────────────────────────┐ │ ││ │ │ AbortController Chain │ │ ││ │ │ parentAbort → siblingAbort → toolAbort (per-tool) │ │ ││ │ └──────────────────────────────────────────────────────────────┘ │ ││ └─────────────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────────────────────────────────────────────────────────┐ ││ │ Agent Tool (Subagent Spawning) │ ││ │ │ ││ │ ┌──────────────────────────────────────────────────────────────┐ │ ││ │ │ runAgent() Flow │ │ ││ │ │ │ │ ││ │ │ 1. Resolve Agent Model & ID │ │ ││ │ │ 2. Initialize MCP Servers (Agent-specific) │ │ ││ │ │ 3. Execute SubagentStart Hooks │ │ ││ │ │ 4. Register Frontmatter Hooks │ │ ││ │ │ 5. Preload Skills from Frontmatter │ │ ││ │ │ 6. Build Agent System Prompt │ │ ││ │ │ 7. Create Isolated ToolUseContext │ │ ││ │ │ 8. Call query() (Recursive Loop) │ │ ││ │ │ 9. Cleanup (MCP, Hooks, FileState, Tasks) │ │ ││ │ └──────────────────────────────────────────────────────────────┘ │ ││ │ │ ││ │ ┌──────────────────┐ ┌──────────────────┐ │ ││ │ │ Sync Subagent │ │ Async Subagent │ │ ││ │ │ (Shared Abort) │ │ (Independent) │ │ ││ │ └──────────────────┘ └──────────────────┘ │ ││ └─────────────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────────────────────────────────────────────────────────┐ ││ │ API Layer (claude.ts) │ ││ │ │ ││ │ ┌──────────────────┐ ┌──────────────────────────────────┐ │ ││ │ │ Streaming API │ │ Error Recovery Chain │ │ ││ │ │ │ │ │ │ ││ │ │ - Beta.messages │ │ - 529 Retry (withRetry) │ │ ││ │ │ - AsyncIterable │ │ - Model Fallback │ │ ││ │ │ - VCR Recording │ │ - Prompt-Too-Long Recovery │ │ ││ │ │ │ │ - Max-Output-Tokens Recovery │ │ ││ │ └──────────────────┘ └──────────────────────────────────┘ │ ││ └─────────────────────────────────────────────────────────────────────┘ │└───────────────────────────────────────────────────────────────────────┘
1.2 核心文件与模块位置追踪
以下是系统核心模块的详细定位表:
|
|
|
|
|
|
|---|---|---|---|---|
| 主查询循环 | src/query.ts |
|
|
query()
queryLoop() |
| 子Agent执行 | src/tools/AgentTool/runAgent.ts |
|
|
runAgent() |
| 流式工具执行器 | src/services/tools/StreamingToolExecutor.ts |
|
|
StreamingToolExecutor
|
| 单工具执行 | src/services/tools/toolExecution.ts |
|
|
runToolUse() |
| 工具类型定义 | src/Tool.ts |
|
|
Tool
ToolUseContext, buildTool() |
| API 客户端 | src/services/api/claude.ts |
|
|
queryModelWithStreaming() |
| REPL界面 | src/screens/REPL.tsx |
|
|
REPL
|
| 工具注册 | src/tools.ts |
|
|
getAllBaseTools()
assembleToolPool() |
| Hook系统 | src/utils/hooks.ts |
|
|
executePreToolUseHooks()
executePostToolUseHooks() |
| 消息工具 | src/utils/messages.ts |
|
|
createUserMessage()
normalizeMessagesForAPI() |
1.3 数据流向概览
用户输入 (PromptInput) │ ├─→ Slash Command 解析 (/commit, /review, etc.) │ └─→ Command 执行 → 结果消息 │ └─→ 普通文本/附件 └─→ createUserMessage() └─→ query() 调用 │ ├─→ [Turn 1] API Streaming │ ├─→ text blocks → 直接 yield │ ├─→ tool_use blocks → StreamingToolExecutor │ │ ├─→ 权限检查 (canUseTool) │ │ ├─→ PreToolUse Hooks │ │ ├─→ tool.call() 执行 │ │ ├─→ PostToolUse Hooks │ │ └─→ tool_result 消息 │ └─→ 继续判断 (needsFollowUp) │ ├─→ [Turn 2] 带 tool_result 的 API 调用 │ └─→ 循环继续... │ └─→ [Terminal] 无 tool_use 或 stop_hook 阻止 └─→ 返回 { reason: 'completed' }
2. 核心主流程拆解 (Core Flow / Lifecycle)
本节将从代码层面逐步拆解 Agent 运行的完整生命周期。我们将从 query.ts 的 query() 函数入口开始,追踪每一个关键步骤。
2.1 入口点:query() 函数
query() 是整个 Agent 运行的主入口,定义在 src/query.ts:219-239:
// src/query.ts:219-239exportasyncfunction* query( params: QueryParams,): AsyncGenerator< | StreamEvent | RequestStartEvent | Message | TombstoneMessage | ToolUseSummaryMessage,Terminal> {const consumedCommandUuids: string[] = [];// 调用核心循环const terminal = yield* queryLoop(params, consumedCommandUuids);// 正常完成时通知命令生命周期for (const uuid of consumedCommandUuids) { notifyCommandLifecycle(uuid, "completed"); }return terminal;}
关键设计点:
-
使用 AsyncGenerator实现流式输出,允许 UI 实时渲染消息 -
consumedCommandUuids用于追踪已消费的斜杠命令,完成后发送通知 -
返回类型 Terminal是一个包含结束原因的对象,如{ reason: 'completed' }
2.2 核心循环:queryLoop() 状态机
真正的执行逻辑在 queryLoop() 中,这是一个经典的 while(true) 状态机:
// src/query.ts:241-251asyncfunction* queryLoop( params: QueryParams, consumedCommandUuids: string[],): AsyncGenerator<...> {// ... 初始化 ...// eslint-disable-next-line no-constant-conditionwhile (true) {// 每次迭代的状态解构let { toolUseContext } = stateconst { messages, autoCompactTracking, maxOutputTokensRecoveryCount,// ... 更多状态字段 } = state// ... 核心逻辑 ...// Continue 或 Return 决策点if (needsFollowUp) { state = { ...nextState }continue// 继续下一轮迭代 }return { reason: 'completed' } // 终止循环 }}
2.3 State 类型定义
循环状态通过 State 类型管理,定义在 src/query.ts:204-217:
// src/query.ts:204-217type State = { messages: Message[]; // 对话消息数组 toolUseContext: ToolUseContext; // 工具执行上下文 autoCompactTracking: AutoCompactTrackingState | undefined; // 自动压缩追踪 maxOutputTokensRecoveryCount: number; // 输出token恢复计数 hasAttemptedReactiveCompact: boolean; // 是否尝试过响应式压缩 maxOutputTokensOverride: number | undefined; // 输出token覆盖值 pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined; stopHookActive: boolean | undefined; // Stop Hook 是否激活 turnCount: number; // 当前轮次计数 transition: Continue | undefined; // 上次迭代继续的原因};
状态字段详解:
|
|
|
|
|
|---|---|---|---|
messages |
Message[] |
|
|
toolUseContext |
ToolUseContext |
|
|
turnCount |
number |
|
|
transition |
Continue |
|
|
2.4 Step-by-Step 流程拆解
以下是每一轮迭代(一个 “Turn”)的完整步骤:
Step 1: 请求开始信号与状态初始化
// src/query.ts:337-364yield { type: 'stream_request_start' } // UI收到此事件后开始显示spinner// 初始化或递增 query chain tracking(用于分析链追踪)const queryTracking = toolUseContext.queryTracking ? { chainId: toolUseContext.queryTracking.chainId, depth: toolUseContext.queryTracking.depth + 1, } : { chainId: deps.uuid(), depth: 0, }// 更新 toolUseContext 带上追踪信息toolUseContext = { ...toolUseContext, queryTracking,}
Step 2: 消息预处理链
在调用 API 前,系统会执行一系列预处理操作:
// src/query.ts:365-467// 1. 获取 compact boundary 后的消息(避免重复发送已压缩内容)let messagesForQuery = [...getMessagesAfterCompactBoundary(messages)]// 2. 应用工具结果预算(控制大结果截断)messagesForQuery = await applyToolResultBudget( messagesForQuery, toolUseContext.contentReplacementState,// ...)// 3. 应用 snip compact(如果启用)if (feature('HISTORY_SNIP')) {const snipResult = snipModule!.snipCompactIfNeeded(messagesForQuery) messagesForQuery = snipResult.messages snipTokensFreed = snipResult.tokensFreed}// 4. 应用 microcompact(缓存编辑优化)const microcompactResult = await deps.microcompact( messagesForQuery, toolUseContext, querySource,)messagesForQuery = microcompactResult.messages// 5. 应用 context collapse(如果启用)if (feature('CONTEXT_COLLAPSE') && contextCollapse) {const collapseResult = await contextCollapse.applyCollapsesIfNeeded( messagesForQuery, toolUseContext, querySource, ) messagesForQuery = collapseResult.messages}// 6. 自动压缩检查(超出阈值时触发)const { compactionResult, consecutiveFailures } = await deps.autocompact( messagesForQuery, toolUseContext, { systemPrompt, userContext, systemContext, ... }, querySource, tracking, snipTokensFreed,)
预处理链的作用:
原始消息数组 (messages) │ ├─→ getMessagesAfterCompactBoundary() │ └─→ 过滤掉 compact_boundary 之前的消息 │ ├─→ applyToolResultBudget() │ └─→ 截断超大 tool_result,保持上下文稳定 │ ├─→ snipCompact (可选) │ └─→ 移除过长的历史 thinking blocks │ ├─→ microcompact │ └─→ 基于缓存编辑的增量压缩 │ ├─→ contextCollapse (可选) │ └─→ 投影已提交的 collapse 视图 │ └─→ autocompact └─→ 超阈值触发完整压缩 ├─→ 成功:生成 summary messages └─→ 失败:记录 consecutiveFailures最终消息数组 (messagesForQuery) → 发送给 API
Step 3: 创建 StreamingToolExecutor
// src/query.ts:560-568const useStreamingToolExecution = config.gates.streamingToolExecution;let streamingToolExecutor = useStreamingToolExecution ? new StreamingToolExecutor( toolUseContext.options.tools, canUseTool, toolUseContext, ) : null;
StreamingToolExecutor 是并发工具执行的核心组件,后续章节会深入分析。
Step 4: API 流式调用循环
// src/query.ts:654-863while (attemptWithFallback) { attemptWithFallback = falsetry {let streamingFallbackOccured = false// 核心 API 调用forawait (const message of deps.callModel({ messages: prependUserContext(messagesForQuery, userContext), systemPrompt: fullSystemPrompt, thinkingConfig: toolUseContext.options.thinkingConfig, tools: toolUseContext.options.tools, signal: toolUseContext.abortController.signal, options: {async getToolPermissionContext() {const appState = toolUseContext.getAppState()return appState.toolPermissionContext }, model: currentModel,// ... 更多选项 }, })) {// 处理流式消息...if (message.type === 'assistant') { assistantMessages.push(message)// 检测 tool_use blocksconst msgToolUseBlocks = message.message.content.filter(content => content.type === 'tool_use', ) as ToolUseBlock[]if (msgToolUseBlocks.length > 0) { toolUseBlocks.push(...msgToolUseBlocks) needsFollowUp = true// 标记需要继续循环 }// 将 tool_use 加入 StreamingToolExecutorif ( streamingToolExecutor && !toolUseContext.abortController.signal.aborted ) {for (const toolBlock of msgToolUseBlocks) { streamingToolExecutor.addTool(toolBlock, message) } } }// yield 完成的工具结果for (const result of streamingToolExecutor.getCompletedResults()) {if (result.message) {yield result.message toolResults.push(...) } } } } catch (innerError) {// Fallback 处理... }}
流式处理的关键点:
-
边接收边执行:当 API 流式返回 tool_useblocks 时,立即加入执行队列 -
Progress 立即 yield:工具的进度消息(如 Bash 的 stdout)立即发送给 UI -
结果收集: getCompletedResults()返回已完成的结果,不阻塞未完成的工具
Step 5: 中断与错误处理
// src/query.ts:1011-1052if (toolUseContext.abortController.signal.aborted) {if (streamingToolExecutor) {// 消费剩余结果 - executor 为中止的工具生成合成 tool_resultsforawait (const update of streamingToolExecutor.getRemainingResults()) {if (update.message) {yield update.message; } } } else {yield * yieldMissingToolResultBlocks(assistantMessages, "Interrupted by user"); }// 跳过 submit-interrupts 的中断消息if (toolUseContext.abortController.signal.reason !== "interrupt") {yield createUserInterruptionMessage({ toolUse: false }); }return { reason: "aborted_streaming" };}
Step 6: Stop Hooks 处理
当模型返回的响应没有 tool_use(即 needsFollowUp === false)时,系统进入 Stop Hooks 评估阶段:
// src/query.ts:1267-1306const stopHookResult =yield * handleStopHooks( messagesForQuery, assistantMessages, systemPrompt, userContext, systemContext, toolUseContext, querySource, stopHookActive, );if (stopHookResult.preventContinuation) {return { reason: "stop_hook_prevented" };}if (stopHookResult.blockingErrors.length > 0) {// Stop Hook 返回了阻塞错误,需要继续循环处理const next: State = { messages: [ ...messagesForQuery, ...assistantMessages, ...stopHookResult.blockingErrors, ],// ... transition: { reason: "stop_hook_blocking" }, }; state = next;continue;}
Step 7: 工具执行与结果收集
// src/query.ts:1363-1409const toolUpdates = streamingToolExecutor ? streamingToolExecutor.getRemainingResults() : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext);forawait (const update of toolUpdates) {if (update.message) {yield update.message;// 检测 hook_stopped_continuation 信号if ( update.message.type === "attachment" && update.message.attachment.type === "hook_stopped_continuation" ) { shouldPreventContinuation = true; } toolResults.push( ...normalizeMessagesForAPI( [update.message], toolUseContext.options.tools, ).filter((_) => _.type === "user"), ); }if (update.newContext) { updatedToolUseContext = { ...update.newContext, queryTracking }; }}
Step 8: Continue 决策与状态更新
// src/query.ts:1500-1550 (简化版)const next: State = { messages: [...messagesForQuery, ...assistantMessages, ...toolResults], toolUseContext: toolUseContextWithQueryTracking, autoCompactTracking: undefined, maxOutputTokensRecoveryCount: 0, hasAttemptedReactiveCompact: false, maxOutputTokensOverride: undefined, pendingToolUseSummary: nextPendingToolUseSummary, stopHookActive: undefined, turnCount: nextTurnCount, transition: undefined,};state = next;// while(true) 继续下一轮迭代
3. 关键机制与底层设计 (Deep Dive into Mechanisms)
本节深入分析系统中最关键的技术难点。
3.1 无限循环控制机制
3.1.1 Continue vs Return 决策树
API 响应处理完成 │ ├─→ abortController.signal.aborted? │ YES → return { reason: 'aborted_streaming' } │ ├─→ needsFollowUp === false? │ │ │ ├─→ lastMessage.isApiErrorMessage? │ │ YES → return { reason: 'completed' } │ │ │ ├─→ Stop Hook preventContinuation? │ │ YES → return { reason: 'stop_hook_prevented' } │ │ │ ├─→ Stop Hook blockingErrors? │ │ YES → state = {..., transition: 'stop_hook_blocking'} │ │ continue │ │ │ └─→ Token Budget exceeded? │ YES → return { reason: 'completed' } │ NO → return { reason: 'completed' } │ └─→ needsFollowUp === true (有 tool_use) │ ├─→ shouldPreventContinuation (from hook)? │ YES → return { reason: 'hook_stopped' } │ ├─→ abortController.signal.aborted (during tools)? │ YES → return { reason: 'aborted_during_tools' } │ └─→ 正常情况 state = { messages: [...+toolResults], ... }continue
3.1.2 状态转换图
┌─────────────────────────────────────────────────────────────────────┐│ Turn N (Start) ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ State Initialization │ ││ │ messages, toolUseContext, turnCount, tracking, etc. │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ Preprocessing Chain │ ││ │ compactBoundary → toolResultBudget → snip → microcompact │ ││ │ → contextCollapse → autocompact │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ API Streaming Call │ ││ │ queryModelWithStreaming() → yields StreamEvent/Assistant │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────────────────────────────┐ ││ │ tool_use blocks detected? │ ││ └─────────────────────────────────────────┘ ││ │ │ ││ YES NO ││ │ │ ││ ↓ ↓ ││ ┌─────────────────────────┐ ┌─────────────────────────────┐ ││ │ StreamingToolExecutor │ │ Stop Hooks Evaluation │ ││ │ .addTool() │ │ │ ││ │ .getCompletedResults() │ │ handleStopHooks() │ ││ │ .getRemainingResults() │ │ │ ││ └─────────────────────────┘ └─────────────────────────────┘ ││ │ │ ││ ↓ ↓ ││ ┌─────────────────────────┐ ┌─────────────────────────────┐ ││ │ Tool Results │ │ preventContinuation? │ ││ │ Collected │ │ blockingErrors? │ ││ └─────────────────────────┘ └─────────────────────────────┘ ││ │ │ ││ │ ┌───────┴───────┐ ││ │ YES NO ││ │ │ │ ││ │ ↓ ↓ ││ │ ┌──────────────┐ ┌──────────────────────┐ ││ │ │ Return │ │ Return {completed} │ ││ │ │ {prevented} │ │ │ ││ │ └──────────────┘ └──────────────────────┘ ││ │ ││ ↓ ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ State Update for Continue │ ││ │ state = { │ ││ │ messages: [...prev, ...assistant, ...toolResults], │ ││ │ turnCount: prev.turnCount + 1, │ ││ │ ... │ ││ │ } │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ ┌─────────────────┐ ││ │ continue │ ││ │ (while loop) │ ││ └─────────────────┘ ││ ↓ ││ Turn N+1 │└─────────────────────────────────────────────────────────────────────┘
3.2 StreamingToolExecutor 并发机制
3.2.1 TrackedTool 类型与状态管理
// src/services/tools/StreamingToolExecutor.ts:19-32type ToolStatus = "queued" | "executing" | "completed" | "yielded";type TrackedTool = { id: string; // tool_use block ID block: ToolUseBlock; // 原始 tool_use block assistantMessage: AssistantMessage; // 关联的 assistant 消息 status: ToolStatus; // 执行状态 isConcurrencySafe: boolean; // 是否可并发执行 promise?: Promise<void>; // 执行 Promise(用于等待) results?: Message[]; // 执行结果 pendingProgress: Message[]; // 待 yield 的进度消息 contextModifiers?: Array<(context: ToolUseContext) => ToolUseContext>;};
状态流转:
'queued' → 工具刚加入队列,等待执行条件满足 │ ↓ canExecuteTool() 返回 true'executing' → 工具正在执行,Promise 进行中 │ ↓ Promise resolve'completed' → 执行完成,结果已收集 │ ↓ getCompletedResults() 处理'yielded' → 结果已 yield 给调用方
3.2.2 并发控制逻辑
// src/services/tools/StreamingToolExecutor.ts:129-135private canExecuteTool(isConcurrencySafe: boolean): boolean {const executingTools = this.tools.filter(t => t.status === 'executing')return (// 情况1:没有正在执行的工具 → 可以执行 executingTools.length === 0 ||// 情况2:当前工具是并发安全的,且所有正在执行的工具也是并发安全的 → 可以并发 (isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe)) )}
并发规则表格:
|
|
|
|
|
|---|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
3.2.3 Bash 错误级联取消机制
// src/services/tools/StreamingToolExecutor.ts:354-364if (isErrorResult) { thisToolErrored = true;// 只有 Bash 错误会取消兄弟进程// Bash 命令通常有隐式依赖链(如 mkdir 失败 → 后续命令无意义)if (tool.block.name === BASH_TOOL_NAME) {this.hasErrored = true;this.erroredToolDescription = this.getToolDescription(tool);this.siblingAbortController.abort("sibling_error"); }}
设计哲学:
-
Bash 工具的错误会触发 siblingAbortController.abort(),取消所有正在执行的兄弟工具 -
Read/WebFetch 等独立工具的错误不会触发级联取消 -
这是因为 Bash 命令之间存在隐式依赖(如 mkdir→cd),而 Read 等工具相互独立
3.2.4 AbortController 链式结构
┌─────────────────────────────────────────────────────────────────┐│ toolUseContext.abortController ││ (Parent / Session Level) ││ [用户 Ctrl+C 绑定] ││ ↓ ││ siblingAbortController ││ [Bash Error 触发,取消兄弟进程] ││ ↓ ││ ┌──────────────────┐ ┌──────────────────┐ ┌───────────────┐ ││ │ toolAbortCtrl 1 │ │ toolAbortCtrl 2 │ │ toolAbortCtrl │ ││ │ (Tool A) │ │ (Tool B) │ │ (Tool C) │ ││ │ │ │ │ │ │ ││ │ - 权限拒绝也会 │ │ │ │ │ ││ │ bubble up │ │ │ │ │ ││ └──────────────────┘ └──────────────────┘ └───────────────┘ │└─────────────────────────────────────────────────────────────────┘
3.3 子Agent隔离机制
3.3.1 runAgent() 核心流程
// src/tools/AgentTool/runAgent.ts:248-860 (简化版)exportasyncfunction* runAgent({ agentDefinition, promptMessages, toolUseContext, canUseTool, isAsync,// ... 更多参数}): AsyncGenerator<Message, void> {// 1. 解析 Agent 模型const resolvedAgentModel = getAgentModel( agentDefinition.model, toolUseContext.options.mainLoopModel, model, permissionMode, )// 2. 创建 Agent IDconst agentId = override?.agentId ?? createAgentId()// 3. 初始化 Agent 专属 MCP 服务器const { clients: mergedMcpClients, tools: agentMcpTools, cleanup: mcpCleanup, } = await initializeAgentMcpServers(agentDefinition, toolUseContext.options.mcpClients)// 4. 执行 SubagentStart Hooksforawait (const hookResult of executeSubagentStartHooks( agentId, agentDefinition.agentType, agentAbortController.signal, )) {if (hookResult.additionalContexts) { additionalContexts.push(...hookResult.additionalContexts) } }// 5. 注册 Agent 的 Frontmatter Hooksif (agentDefinition.hooks && hooksAllowedForThisAgent) { registerFrontmatterHooks( rootSetAppState, agentId, agentDefinition.hooks,`agent '${agentDefinition.agentType}'`,true, // isAgent - 将 Stop 转换为 SubagentStop ) }// 6. 预加载 Skillsconst skillsToPreload = agentDefinition.skills ?? []// ... 加载逻辑 ...// 7. 构建 Agent 系统提示词const agentSystemPrompt = override?.systemPrompt ? override.systemPrompt : asSystemPrompt(await getAgentSystemPrompt(agentDefinition, ...))// 8. 创建隔离的 ToolUseContextconst agentToolUseContext = createSubagentContext(toolUseContext, { options: agentOptions, agentId, agentType: agentDefinition.agentType, messages: initialMessages, readFileState: agentReadFileState, abortController: agentAbortController, getAppState: agentGetAppState, shareSetAppState: !isAsync, // Sync Agent 共享,Async Agent 隔离 shareSetResponseLength: true, })// 9. 调用 query()(递归进入主循环)try {forawait (const message of query({ messages: initialMessages, systemPrompt: agentSystemPrompt, userContext: resolvedUserContext, systemContext: resolvedSystemContext, canUseTool, toolUseContext: agentToolUseContext, querySource, maxTurns: maxTurns ?? agentDefinition.maxTurns, })) {yield message // 将消息传回父 Agent } } finally {// 10. 清理await mcpCleanup() clearSessionHooks(rootSetAppState, agentId) agentToolUseContext.readFileState.clear() unregisterPerfettoAgent(agentId) killShellTasksForAgent(agentId, ...) }}
3.3.2 Sync vs Async Agent 的 AbortController 策略
// src/tools/AgentTool/runAgent.ts:524-528const agentAbortController = override?.abortController ? override.abortController : isAsync ? new AbortController() // Async: 独立控制器 : toolUseContext.abortController; // Sync: 共享父控制器
策略对比:
|
|
|
|
|
|---|---|---|---|
| Sync Agent |
|
|
|
| Async Agent |
|
|
|
| Background Agent |
|
|
|
3.3.3 子Agent上下文隔离图
┌─────────────────────────────────────────────────────────────────────┐│ Parent Agent Context ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ toolUseContext │ ││ │ ├─ options.tools (parent's tool pool) │ ││ │ ├─ options.mcpClients (parent's MCP servers) │ ││ │ ├─ abortController (parent's signal) │ ││ │ ├─ messages (parent's conversation) │ ││ │ └─ readFileState (parent's file cache) │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ createSubagentContext() ││ ↓ ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ agentToolUseContext (Isolated) │ ││ │ ├─ options.tools (resolved agent tools) │ ││ │ ├─ options.mcpClients (merged: parent + agent-specific) │ ││ │ ├─ abortController (independent OR shared) │ ││ │ ├─ messages (initialMessages: forkContext + prompt) │ ││ │ ├─ readFileState (cloned from parent OR fresh) │ ││ │ ├─ agentId (unique identifier) │ ││ │ └─ agentType (e.g., "Explore", "Plan") │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ query() call ││ ↓ ││ ┌─────────────────────────────────────────────────────────────┐ ││ │ Agent's own while(true) loop │ ││ │ ├─ Independent state machine │ ││ │ ├─ Own turnCount │ ││ │ └─ Own tool execution queue │ ││ └─────────────────────────────────────────────────────────────┘ ││ ↓ ││ yield messages back to parent │└─────────────────────────────────────────────────────────────────────┘
4. 状态管理与数据流转 (State & Data Management)
4.1 核心数据结构定义
4.1.1 ToolUseContext 类型
// src/Tool.ts:158-300exporttype ToolUseContext = { options: { commands: Command[] debug: boolean mainLoopModel: string tools: Tools // 当前可用的工具池 verbose: boolean thinkingConfig: ThinkingConfig mcpClients: MCPServerConnection[] // MCP 服务器连接 mcpResources: Record<string, ServerResource[]> isNonInteractiveSession: boolean agentDefinitions: AgentDefinitionsResult maxBudgetUsd?: number customSystemPrompt?: string appendSystemPrompt?: string querySource?: QuerySource refreshTools?: () => Tools } abortController: AbortController readFileState: FileStateCache // 文件读取状态缓存 getAppState(): AppState // 获取全局状态 setAppState(f: (prev: AppState) => AppState): void// 更新全局状态 setAppStateForTasks?: (...) // 用于 async agents 的任务状态更新 handleElicitation?: (...) // MCP URL elicitation 处理 setToolJSX?: SetToolJSXFn // UI JSX 渲染回调 addNotification?: (notif: Notification) =>void appendSystemMessage?: (...) // 系统消息追加 sendOSNotification?: (...) // OS 级通知 nestedMemoryAttachmentTriggers?: Set<string> loadedNestedMemoryPaths?: Set<string> dynamicSkillDirTriggers?: Set<string> discoveredSkillNames?: Set<string> userModified?: boolean setInProgressToolUseIDs: (f: (prev: Set<string>) => Set<string>) =>void setHasInterruptibleToolInProgress?: (v: boolean) =>void setResponseLength: (f: (prev: number) => number) =>void pushApiMetricsEntry?: (ttftMs: number) =>void setStreamMode?: (mode: SpinnerMode) =>void onCompactProgress?: (event: CompactProgressEvent) =>void setSDKStatus?: (status: SDKStatus) =>void openMessageSelector?: () =>void updateFileHistoryState: (...) // 文件历史追踪 updateAttributionState: (...) // Git attribution setConversationId?: (id: UUID) =>void agentId?: AgentId // 仅子Agent设置 agentType?: string// 子Agent类型名 requireCanUseTool?: boolean messages: Message[] fileReadingLimits?: { maxTokens?: number, maxSizeBytes?: number } globLimits?: { maxResults?: number } toolDecisions?: Map<string, { source: string, decision: 'accept' | 'reject', timestamp: number }> queryTracking?: QueryChainTracking requestPrompt?: (...) // 交互式 prompt 回调 toolUseId?: string criticalSystemReminder_EXPERIMENTAL?: string preserveToolUseResults?: boolean localDenialTracking?: DenialTrackingState contentReplacementState?: ContentReplacementState renderedSystemPrompt?: SystemPrompt}
4.1.2 Message 类型族
// 基于 src/types/message.js 的使用模式推断type Message = | UserMessage | AssistantMessage | ProgressMessage | AttachmentMessage | SystemMessage | TombstoneMessage | ToolUseSummaryMessage;type UserMessage = {type: "user"; uuid: UUID; timestamp: string; message: { role: "user"; content: string | ContentBlockParam[] }; toolUseResult?: unknown; // 工具执行结果 isMeta?: true; // 标记为元消息(不发送给API) imagePasteIds?: number[];};type AssistantMessage = {type: "assistant"; uuid: UUID; timestamp: string; requestId?: string; // API request ID message: BetaMessage; // Anthropic SDK 消息类型 apiError?: string; isApiErrorMessage?: boolean;};type ProgressMessage<P = ToolProgressData> = {type: "progress"; toolUseID: string; parentToolUseID?: string; data: P;};
4.2 状态可变性分析
|
|
|
|
|
|
|---|---|---|---|---|
messages |
|
~/.claude/session/*.json) |
|
state = { messages: [...] } |
toolUseContext |
|
|
|
|
turnCount |
|
|
|
turnCount + 1 |
autoCompactTracking |
|
|
|
undefined 或新对象 |
readFileState |
|
|
|
|
toolPermissionContext |
|
|
|
setAppState(prev => { ...toolPermissionContext, ... }) |
inProgressToolUseIDs |
|
|
|
setInProgressToolUseIDs(prev => ...) |
4.3 数据流转关键节点
用户输入 │ ├─→ createUserMessage() │ └─→ UserMessage { │ type: 'user', │ uuid: randomUUID(), │ message: { role: 'user', content: [...] } │ } │ └─→ query() 接收 └─→ messages 数组添加 │ ├─→ normalizeMessagesForAPI() │ └─→ 转换为 BetaMessageParam[] │ - 过滤 meta 消息 │ - 截断超大 tool_result │ - 添加 cache_control │ └─→ API 调用 └─→ AssistantMessage {type: 'assistant', message: BetaMessage (from SDK), requestId: response.id } │ ├─→ tool_use blocks │ └─→ StreamingToolExecutor │ └─→ runToolUse() │ └─→ tool.call() │ └─→ ToolResult │ │ │ └─→ createUserMessage() │ └─→ UserMessage { │ content: [{ type: 'tool_result', ... }] │ } │ └─→ text blocks └─→ 直接 yield 给 UI
5. 高级模式与实战分析 (Advanced Patterns)
5.1 设计模式提炼
5.1.1 ReAct Pattern (Reasoning + Acting)
Claude Code 的核心循环本质上是 ReAct 模式的实现:
┌─────────────────────────────────────────────────────────────────┐│ ReAct Loop ││ ││ ┌─────────────────┐ ││ │ Observation │ ← 前一轮的 tool_result ││ │ (Input State) │ ││ └─────────────────┘ ││ ↓ ││ ┌─────────────────┐ ││ │ Thought │ ← Model 生成 reasoning (thinking blocks) ││ │ (API Response) │ ││ └─────────────────┘ ││ ↓ ││ ┌─────────────────┐ ││ │ Action │ ← tool_use blocks ││ │ (Tool Calls) │ ││ └─────────────────┘ ││ ↓ ││ ┌─────────────────┐ ││ │ Execution │ ← StreamingToolExecutor ││ │ (Tool.call) │ ││ └─────────────────┘ ││ ↓ ││ ┌─────────────────┐ ││ │ Result │ ← tool_result content ││ │ (Observation) │ ││ └─────────────────┘ ││ ↓ ││ Continue Loop ││ ↓ ││ ┌─────────────────┐ ││ │ Next Thought │ ← 新一轮 API 调用 ││ │ (with context) │ ││ └─────────────────┘ │└─────────────────────────────────────────────────────────────────┘
5.1.2 责任链模式 (Hook Chain)
工具执行前后的 Hook 形成一条处理链:
// Hook 执行顺序PreToolUse Hooks │ ├─→ Hook 1: 日志记录 │ └─→ { behavior: 'allow' } │ ├─→ Hook 2: 安全检查 │ └─→ { behavior: 'allow' } │ ├─→ Hook 3: 自定义验证 │ └─→ { behavior: 'deny', message: '...' } → 阻止执行 │ └─→ Permission Check (canUseTool) └─→ { behavior: 'allow' } → 继续执行tool.call() 执行 │ └─→ PostToolUse Hooks │ ├─→ Hook 1: 结果格式化 │ ├─→ Hook 2: 通知发送 │ └─→ Hook 3: 状态更新
5.1.3 生成器模式 (AsyncGenerator Streaming)
整个系统基于 AsyncGenerator 实现流式输出:
// query() 是一个 AsyncGeneratorexportasyncfunction* query(params: QueryParams): AsyncGenerator<Message, Terminal>// StreamingToolExecutor 也使用 generatorasync *getRemainingResults(): AsyncGenerator<MessageUpdate, void>// 单个工具执行也是 generatorexportasyncfunction* runToolUse(...): AsyncGenerator<MessageUpdateLazy, void>
优势:
-
UI 可以实时渲染消息(spinner → text → tool_use → progress → result) -
支持中途取消( generator.return()) -
内存效率高(不需要缓存所有消息再一次性返回)
5.1.4 观察者模式 (Progress Subscription)
进度消息通过 yield 立即传递给观察者(UI):
// 工具执行时onProgress?.({ toolUseID: toolUse.id, data: { type: 'bash_stdout', output: '...' }})// Progress 消息被 yieldyield createProgressMessage({ toolUseID: toolBlock.id, data: progressData,})// UI 立即接收并渲染forawait (const message of query(...)) {if (message.type === 'progress') {// 更新 spinner/进度条 }}
5.1.5 隔离模式 (Subagent Context Isolation)
子 Agent 通过 createSubagentContext() 实现上下文隔离:
// src/utils/forkedAgent.tsexportfunctioncreateSubagentContext( parentContext: ToolUseContext, params: { options: ToolUseContext["options"]; agentId: AgentId; agentType: string; messages: Message[]; readFileState: FileStateCache; abortController: AbortController; getAppState: () => AppState; shareSetAppState: boolean; shareSetResponseLength: boolean;// ... },): ToolUseContext{return { ...parentContext, // 继承基础字段 ...params, // 覆盖隔离字段// 关键:setAppState 的隔离处理 setAppState: params.shareSetAppState ? parentContext.setAppState // 共享父级 : () => {}, // Async Agent: no-op };}
5.2 实战案例:复杂请求的完整流转
假设用户请求:”修复那个bug并提交代码”
Step 1: 用户输入处理┌─────────────────────────────────────────────────────────────────┐│ User: "修复那个bug并提交代码" ││ ││ REPL.tsx onQuery() ││ ├─→ handlePromptSubmit() ││ │ └─→ createUserMessage({ content: "..." }) ││ │ ││ └─→ query({ messages: [...newMessage], ... }) │└─────────────────────────────────────────────────────────────────┘Step 2: Turn 1 - Model 分析请求┌─────────────────────────────────────────────────────────────────┐│ queryLoop() while(true) iteration 1 ││ ││ API Response: ││ ├─ thinking: "用户想修复bug,我需要先找到问题..." ││ ├─ text: "我来帮你修复这个bug。首先让我查看最近的提交..." ││ └─ tool_use: GrepTool({ pattern: "error", path: "./src" }) ││ ││ StreamingToolExecutor.addTool(GrepTool) │└─────────────────────────────────────────────────────────────────┘Step 3: Tool 1 - Grep 执行┌─────────────────────────────────────────────────────────────────┐│ runToolUse(GrepTool) ││ ├─→ canUseTool() → allow ││ ├─→ tool.call() ││ │ └─→ GrepTool 执行 grep -r "error" ./src ││ │ └─→ onProgress({ stdout: "...", ... }) ││ │ ││ └─→ ToolResult { ││ data: "src/auth.ts:45: throw new Error('...')" ││ } ││ └─→ createUserMessage({ ││ content: [{ type: 'tool_result', content: "..." }]││ }) │└─────────────────────────────────────────────────────────────────┘Step 4: Turn 2 - Model 分析 grep 结果┌─────────────────────────────────────────────────────────────────┐│ state.messages = [..., grep_result] ││ continue → while(true) iteration 2 ││ ││ API Response: ││ ├─ thinking: "找到了错误位置,需要查看完整代码..." ││ ├─ tool_use: FileReadTool({ file_path: "./src/auth.ts" }) │└─────────────────────────────────────────────────────────────────┘Step 5-7: FileRead → FileEdit → Bash (git diff)... 类似流程 ...Step 8: Turn N - 最终提交┌─────────────────────────────────────────────────────────────────┐│ API Response: ││ ├─ text: "我已经修复了bug,现在提交代码" ││ ├─ tool_use: BashTool({ ││ command: "git add . && git commit -m 'fix bug'", ││ timeout: 30000 ││ }) ││ ││ BashTool 执行 ││ ├─→ Permission Dialog (用户确认) ││ ├─→ git 执行 ││ └─→ tool_result: "committed successfully" │└─────────────────────────────────────────────────────────────────┘Step 9: Stop Hook 评估与终止┌─────────────────────────────────────────────────────────────────┐│ Turn N+1 ││ API Response: ││ ├─ text: "Bug已修复并提交。有什么其他需要帮助的吗?" ││ └─ (no tool_use) ││ ││ needsFollowUp = false ││ └─→ handleStopHooks() ││ └─→ { preventContinuation: false } ││ └─→ return { reason: 'completed' } │└─────────────────────────────────────────────────────────────────┘
6. 总结 (Summary)
核心技术结论
-
状态机驱动架构:Agent 运行基于
while(true)状态机,通过State类型管理跨迭代状态,continue/return控制循环流向。 -
流式优先设计:从 API 调用到工具执行,全链路使用
AsyncGenerator,实现 UI 实时渲染与内存效率优化。 -
并发工具执行:
StreamingToolExecutor实现了智能并发控制——isConcurrencySafe工具并行执行,非安全工具串行执行,Bash 错误触发兄弟进程级联取消。 -
预处理链分层:API 调用前执行
compactBoundary → toolResultBudget → snip → microcompact → contextCollapse → autocompact六层预处理,优化上下文大小。 -
AbortController 链式结构:
parentAbort → siblingAbort → toolAbort三级结构实现用户取消、错误级联、工具独立控制的多层中断机制。 -
子Agent完全隔离:通过
createSubagentContext()创建独立的ToolUseContext,包括独立工具池、MCP 服务器、消息历史、文件缓存;Sync Agent 共享 abort,Async Agent 完全独立。 -
Hook 责任链:
PreToolUse → Permission → tool.call → PostToolUse形成 full lifecycle hook 链,支持用户自定义扩展。 -
ReAct 本质:整个系统是 ReAct 模式的工业级实现——Thought(API reasoning)、Action(tool_use)、Observation(tool_result)循环推进。
-
动态上下文管理:
ToolUseContext包含 30+ 字段,涵盖工具池、权限、MCP、缓存、追踪、通知等全维度运行时状态。 -
错误恢复机制完备:包含 529 重试、模型 fallback、prompt-too-long 响应式压缩、max-output-tokens 多轮恢复等多层保障。
夜雨聆风