文档版本:2026.4.9 最后更新:2026-04-09
源码验证: ✅ 本文档基于实际源码分析
1. 模块概述

Agent 运行时模块是 OpenClaw 的核心执行引擎,负责运行 AI Agent、管理模型调用、处理工具执行和流式响应。基于源码验证,该模块采用嵌入式 Pi Agent 架构,支持多模型、故障转移、身份验证轮换和工具执行管道。
核心职责
• Agent 执行: 运行嵌入式 Pi Agent,处理用户提示 • 模型管理: 模型选择、身份验证、上下文窗口验证 • 故障转移: 自动切换认证配置文件、模型降级 • 工具执行: 管理编码工具(bash、文件操作等) • 流式处理: 流式响应、分块回复、推理流 • 会话管理: SessionManager 集成、转录文件操作
关键文件位置
src/agents/├── pi-embedded-runner/│ ├── run.ts # 主入口(runEmbeddedPiAgent)│ ├── run/│ │ ├── attempt.ts # 单次运行尝试(runEmbeddedAttempt)│ │ ├── payloads.ts # 响应载荷构建│ │ └── params.ts # 参数类型定义(RunEmbeddedPiAgentParams)│ ├── runs.ts # 运行状态管理(队列、中止)│ ├── auth/│ │ └── controller.ts # Auth Controller(createEmbeddedRunAuthController)│ ├── context-engine/ # 上下文引擎(替代直接压缩)│ │ ├── index.ts│ │ └── lifecycle.ts│ ├── compact.ts # 传统会话压缩│ ├── session-lock.ts # 会话写锁(acquireSessionWriteLock)│ ├── types.ts # 核心类型定义│ ├── session-manager-cache.ts # SessionManager 缓存│ └── ...(其他辅助模块)├── pi-embedded-subscribe.ts # 流式事件订阅├── pi-tools.ts # 工具创建(createOpenClawCodingTools)├── pi-embedded-helpers.ts # 辅助函数(错误分类等)└── bundle-mcp/ # Bundle MCP 工具运行时 └── runtime.ts2. 核心组件

2.1 主运行入口(runEmbeddedPiAgent)
文件: src/agents/pi-embedded-runner/run.ts
函数签名
export async function runEmbeddedPiAgent( params: RunEmbeddedPiAgentParams): Promise<EmbeddedPiRunResult>核心流程
0. SessionKey 回填(早期规范化) backfillSessionKey({ config, sessionId, sessionKey, agentId }) → 确保所有下游(Hook、LCM、压缩等)都接收到非空 sessionKey1. 队列管理 enqueueSession() → enqueueGlobal() → 会话级队列:防止同一会话并发运行 → 全局级队列:控制全局并发2. 工作空间解析 & 插件加载 resolveRunWorkspaceDir({ workspaceDir, sessionKey, agentId, config }) → 支持 fallback 路径(workspaceResolution.usedFallback) ensureRuntimePluginsLoaded({ config, workspaceDir, allowGatewaySubagentBinding }) → 延迟绑定运行时插件3. Hook 模型选择(新增) resolveHookModelSelection({ prompt, provider, modelId, hookRunner, hookContext }) → 允许插件 hook 在运行前替换 provider/modelId → 结果: { provider, modelId, legacyBeforeAgentStartResult }4. 模型解析(异步) resolveModelAsync(provider, modelId, agentDir, config) → 从模型注册表查找模型定义 resolveEffectiveRuntimeModel({ cfg, provider, modelId, runtimeModel }) → 处理运行时模型覆盖(如动态切换)5. Context Engine 初始化(新增) ensureContextEnginesInitialized() contextEngine = await resolveContextEngine(params.config) → 新增的上下文引擎抽象层,替代直接调用 compactEmbeddedPiSessionDirect → contextEngine.compact() / contextEngine.dispose() → contextEngine.info.ownsCompaction 决定谁负责触发压缩6. Auth Controller(重构) createEmbeddedRunAuthController({ ...状态引用... }) → 封装所有认证配置文件管理逻辑 → 提供: advanceAuthProfile / initializeAuthProfile / maybeRefreshRuntimeAuthForAuthError / stopRuntimeAuthRefreshTimer7. 循环尝试(while(true),上限 MAX_RUN_LOOP_ITERATIONS) runEmbeddedAttempt(...) ── 成功路径 ── → buildEmbeddedRunPayloads() → markAuthProfileGood / markAuthProfileUsed → 返回 EmbeddedPiRunResult ── 新增错误路径 ── a) 超时触发压缩(timeout + 高 token 占用率 > 65%) contextEngine.compact({ trigger: "timeout_recovery", force: true }) → 压缩成功 → continue(重试) → 失败 → 转普通超时错误处理 b) 上下文溢出(改用 contextEngine) contextEngine.compact({ trigger: "overflow", force: true }) → 支持 MAX_OVERFLOW_COMPACTION_ATTEMPTS=3 次尝试 → 工具结果截断: truncateOversizedToolResultsInSession() → 完全失败 → 返回 context_overflow 错误 c) 速率限制 → 配置文件轮换限制升级 maybeEscalateRateLimitProfileFallback() → 超过 rateLimitProfileRotationLimit → 升级为 FailoverError(模型降级) d) 规划仅执行(Planning-Only)检测(新增) resolvePlanningOnlyRetryInstruction() → 检测到 Agent 只规划但未执行 → 注入 "act-now" 指令重试(最多 1 次) → emitAgentPlanEvent() 通知上层 e) 不完整轮次(Incomplete Turn)检测(新增) resolveIncompleteTurnPayloadText() → prompt() 提前解析但无 payload → 返回用户可见错误提示 f) 实时模型切换(Live Model Switch)(新增) shouldSwitchToLiveModel() → 检测到待处理的模型切换请求 → 抛出 LiveSessionModelSwitchError 触发上层切换8. 结果构建 buildEmbeddedRunPayloads() → 整合 assistantTexts、toolMetas、lastAssistant → 返回 EmbeddedPiRunResult(含 successfulCronAdds、messagingToolSentMediaUrls)关键特性
• 双层队列: 会话级 + 全局级,防止并发冲突 • Auth Controller: 封装认证管理,支持运行时刷新 • Context Engine: 新的上下文引擎抽象,统一管理压缩/维护 • 超时触发压缩: LLM 超时时若 prompt token 占用率 > 65% 自动压缩 • 规划重试: 检测 planning-only 轮次并注入执行指令 • 速率限制升级: N 次轮换后自动升级为模型降级 • 思考级别降级: 不支持时自动降级(high → low → off) • GitHub Copilot 特殊处理: 动态 token 交换
2.2 运行尝试(runEmbeddedAttempt)
文件: src/agents/pi-embedded-runner/run/attempt.ts
函数签名
export async function runEmbeddedAttempt( params: EmbeddedRunAttemptParams): Promise<EmbeddedRunAttemptResult>核心流程
1. 工作空间初始化 fs.mkdir(resolvedWorkspace) → 支持沙盒隔离(sandbox.enabled) → effectiveWorkspace 根据沙盒访问级别选择路径 acquireSessionWriteLock({ sessionFile, maxHoldMs }) ← 新增会话写锁2. 技能加载 resolveEmbeddedRunSkillEntries(workspaceDir, config, agentId, skillsSnapshot) applySkillEnvOverridesFromSnapshot() | applySkillEnvOverrides() → 从 skillsSnapshot 或 .skills/ 目录加载 → 应用环境变量覆盖3. Bootstrap 文件处理 resolveAttemptBootstrapContext({ contextInjectionMode, bootstrapContextMode, ... }) → 加载工作空间引导文件(如 AGENTS.md/CLAUDE.md) → analyzeBootstrapBudget() 分析 token 预算 → 支持 lightweight/full 模式(bootstrapContextMode) → buildBootstrapPromptWarning() 生成截断警告4. 工具创建(含 Bundle MCP/LSP) createOpenClawCodingTools() ← 内置编码工具 normalizeProviderToolSchemas() ← provider 规范化 getOrCreateSessionMcpRuntime() + materializeBundleMcpToolsForRun() ← Bundle MCP(新增) createBundleLspToolRuntime() ← Bundle LSP(新增) effectiveTools = [ ...tools, ...bundleMcpTools, ...bundleLspTools ] → toolsAllow 过滤(最小化模式)5. 系统提示构建 buildEmbeddedSystemPrompt() ← 主体系统提示 resolveSystemPromptOverride() ← 允许配置完全替换 resolveProviderSystemPromptContribution() ← provider 特有贡献(新增) → 含 heartbeatPrompt、ttsHint、memoryCitationsMode → effectivePromptMode: "full" | "minimal"(toolsAllow 时用 minimal)6. SessionManager 初始化 prewarmSessionFile(sessionFile) SessionManager.open(sessionFile) + guardSessionManager() runAttemptContextEngineBootstrap() ← Context Engine 引导(新增) buildEmbeddedExtensionFactories() ← 扩展工厂(新增) DefaultResourceLoader({ extensionFactories }) ← 资源加载器(新增) applyPiAutoCompactionGuard() ← auto-compaction 守卫7. 历史消息处理 sanitizeSessionHistory() → validateReplayTurns() ← 新增 validateReplayTurns filterHeartbeatPairs() ← 心跳消息过滤(新增) limitHistoryTurns() → sanitizeToolUseResultPairing() pruneProcessedHistoryImages() ← 历史图像裁剪(新增)8. Context Engine 消息组装(新增) assembleAttemptContextEngine({ contextEngine, messages, tokenBudget, ... }) → 可注入 systemPromptAddition → 替换 messages(memory、增强等)9. Stream 传输策略(重构) resolveEmbeddedAgentBaseStreamFn() ← 基础流函数 registerProviderStreamForModel() ← provider 特有流(新增) shouldUseOpenAIWebSocketTransport() ← WebSocket 传输(新增) → streamFn 链式包装: - dropThinkingBlocks / sanitizeToolCallIds(transcriptPolicy) - wrapStreamFnSanitizeMalformedToolCalls - wrapStreamFnTrimToolCallNames - wrapStreamFnRepairMalformedToolCallArguments(Anthropic) - wrapStreamFnDecodeXaiToolCallArguments(xAI) - wrapStreamFnHandleSensitiveStopReason - streamWithIdleTimeout(LLM 空闲超时,新增) - GooglePromptCacheStreamFn(Google 提示缓存,新增) - sessions_yield 中止检测10. 预检溢出检测(新增) shouldPreemptivelyCompactBeforePrompt() → route: "truncate_tool_results_only" → 提前截断工具结果 → route: "compact_only" / "compact_then_truncate" → 触发 preflightRecovery11. 图像注入(增强) detectAndLoadPromptImages({ sandbox, workspaceOnly, ... }) → 支持沙盒路径限制 → imageOrder 控制注入顺序12. Agent 运行 createAgentSession({ builtInTools, customTools: [...customTools, ...clientToolDefs], ... }) → Client Tools(OpenResponses 托管工具,新增) → sessions_yield 工具(新增) await activeSession.prompt(effectivePrompt, { images? }) → 超时管理:scheduleAbortTimer 含 compactionGrace 扩展13. 订阅流式事件 subscribeEmbeddedPiSession() → 详见序列图 0314. Context Engine 轮次收尾(新增) finalizeAttemptContextEngineTurn({ contextEngine, ... }) → 触发 after_compaction hooks15. 返回结果 { assistantTexts, toolMetas, lastAssistant, sessionIdUsed, aborted, timedOut, idleTimedOut, timedOutDuringCompaction, ← 新增 preflightRecovery, ← 新增 yieldDetected, ← 新增 clientToolCall, ← 新增 successfulCronAdds, ← 新增 messagingToolSentMediaUrls, ← 新增 compactionCount, attemptUsage, promptCache ← 新增 }关键特性
• 会话写锁: 防止同一会话文件并发写入 • Bundle MCP/LSP: 动态加载插件提供的额外工具 • 预检溢出: 在提交前估算 token,主动触发截断/压缩 • Context Engine 生命周期: bootstrap → assemble → finalize 三段式 • LLM 空闲超时: 独立于全局超时的 LLM 无响应检测 • sessions_yield 工具: Agent 主动让出控制权 • Client Tools: 支持 OpenResponses 托管工具(外部调用回调) • Stream 变换链: 多层 transcriptPolicy 流包装,处理各 provider 差异
2.3 流式订阅(subscribeEmbeddedPiSession)
文件: src/agents/pi-embedded-subscribe.ts
函数签名
export function subscribeEmbeddedPiSession( params: SubscribeEmbeddedPiSessionParams)核心状态
const state: EmbeddedPiSubscribeState = { assistantTexts: [], // 助手文本累积 toolMetas: [], // 工具元数据 toolMetaById: Map<string, ToolMeta>, lastToolError: undefined, blockReplyBreak: 'text_end', // 分块回复触发点 reasoningMode: 'off' | 'on' | 'stream', includeReasoning: boolean, shouldEmitPartialReplies: boolean, streamReasoning: boolean, deltaBuffer: "", // 增量文本缓冲 blockBuffer: "", // 块缓冲 blockState: { // 状态跟踪 thinking: boolean, // 是否在 <think> 块内 final: boolean, // 是否在 <final> 块内 inlineCode: InlineCodeState // 内联代码状态 }, messagingToolSentTexts: [], // 消息工具已发送文本 compactionInFlight: boolean, // 压缩进行中 pendingCompactionRetry: number // 待处理压缩重试}事件处理
// 文本增量(流式)on text_delta: deltaBuffer += delta → 检测 <think>/<final> 标签 → 更新 blockState(thinking, final) → 推送到 blockChunker(分块器) → 触发 onBlockReply(如果配置)// 消息开始on message_start: resetAssistantMessageState() → 清空缓冲区、重置状态 → assistantMessageIndex += 1// 消息结束on message_end: finalizeAssistantTexts() → 合并最终文本 → 推送到 assistantTexts// 工具调用on tool_calls: → 存储工具元数据到 toolMetaById → 等待工具结果// 工具结果on tool_result: → 更新 toolMetas → 检测消息工具(telegram, whatsapp) → 去重已发送文本// 压缩请求on compaction_start: compactionInFlight = true → 暂停正常流处理on compaction_complete: compactionInFlight = false → 恢复流处理分块回复(BlockReplyChunking)
blockReplyBreak: 'text_end' | 'tool_execution_start'blockReplyChunking: { paragraphPreference?: boolean, // 段落优先 softChunkSize?: number, // 软分块大小 hardChunkMaxSize?: number // 硬分块最大值}// 分块器逻辑EmbeddedBlockChunker: → 检测段落边界(\n\n) → 重新打开围栏代码块(```) → 软分块 vs 硬分块 → 触发 onBlockReply(chunk)2.4 工具系统(createOpenClawCodingTools)
文件: src/agents/pi-tools.ts
核心工具类型
// 编码工具(来自 pi-coding-agent)- read_file // 读取文件- write_file // 写入文件- edit_file // 编辑文件(查找替换)- apply_patch // 应用补丁(OpenClaw 扩展)- exec // 执行命令- process_send_keys // 向后台进程发送按键// 消息工具(OpenClaw 特有)- telegram_send // 发送 Telegram 消息- whatsapp_send // 发送 WhatsApp 消息- discord_send // 发送 Discord 消息- slack_send // 发送 Slack 消息- sessions_send // 向其他会话发送消息// OpenClaw 管理工具- sessions_list // 列出会话- sessions_spawn // 生成子代理- camera_capture // 捕获摄像头- ...(其他工具)工具策略过滤
resolveEffectiveToolPolicy(config, sessionKey) → 基础策略: config.tools?.policy → 群组策略: resolveGroupToolPolicy() → 子代理策略: resolveSubagentToolPolicy() → 插件策略: expandPolicyWithPluginGroups()filterToolsByPolicy(tools, policy) → 应用允许列表/阻止列表 → 移除仅限插件工具(如果未安装)沙盒工具包装
// 沙盒启用时:- read_file → createSandboxedReadTool()- write_file → createSandboxedWriteTool()- edit_file → createSandboxedEditTool()// 限制:- 无法访问沙盒外的文件- 路径自动重新映射到沙盒目录2.5 运行状态管理(runs.ts)
文件: src/agents/pi-embedded-runner/runs.ts
活动运行跟踪
const activeEmbeddedRuns = new Map<string, EmbeddedPiQueueHandle>();setActiveEmbeddedRun(runId, handle) → 存储运行句柄 → 支持中止操作clearActiveEmbeddedRun(runId) → 移除运行句柄isEmbeddedPiRunActive(runId) → 检查运行是否活动abortEmbeddedPiRun(runId) → 中止运行(通过 AbortController)运行队列句柄
type EmbeddedPiQueueHandle = { promise: Promise<T>; abort: () => void; isStreaming: boolean;}3. 数据结构(实际定义)
3.1 RunEmbeddedPiAgentParams
文件: src/agents/pi-embedded-runner/run/params.ts
type RunEmbeddedPiAgentParams = { // 会话标识 sessionId: string; sessionKey?: string; agentId?: string; // ← 新增 // 触发类型(新增) /** "user" | "heartbeat" | "cron" | "memory" | "overflow" | "manual" */ trigger?: EmbeddedRunTrigger; // 提示和输入 prompt: string; images?: ImageContent[]; // ← 类型变更(原 string[]) imageOrder?: PromptImageOrderEntry[]; // ← 新增 /** OpenResponses 托管工具(新增)*/ clientTools?: ClientToolDefinition[]; // 模型配置 provider?: string; model?: string; thinkLevel?: ThinkLevel; fastMode?: boolean; // ← 新增 reasoningLevel?: ReasoningLevel; // 认证 authProfileId?: string; authProfileIdSource?: 'auto' | 'user'; allowTransientCooldownProbe?: boolean; // ← 新增 // 工作空间 sessionFile: string; workspaceDir: string; agentDir?: string; // 消息上下文 messageChannel?: string; messageProvider?: string; agentAccountId?: string; messageTo?: string; messageThreadId?: string | number; memoryFlushWritePath?: string; // ← 新增 // 群组上下文(与先前相同) groupId?: string | null; groupChannel?: string | null; groupSpace?: string | null; spawnedBy?: string | null; // 发送者信息 senderId?: string | null; senderName?: string | null; senderUsername?: string | null; senderE164?: string | null; senderIsOwner?: boolean; // ← 新增 // 频道/线程细节(新增) currentChannelId?: string; currentThreadTs?: string; currentMessageId?: string | number; replyToMode?: 'off' | 'first' | 'all' | 'batched'; hasRepliedRef?: { value: boolean }; requireExplicitMessageTarget?: boolean; // ← 新增 disableMessageTool?: boolean; // ← 新增 allowGatewaySubagentBinding?: boolean; // ← 新增 // 工具配置 disableTools?: boolean; toolsAllow?: string[]; // ← 新增(最小化模式) toolResultFormat?: 'markdown' | 'plain'; suppressToolErrorWarnings?: boolean; // ← 新增 execOverrides?: ExecToolDefaults; bashElevated?: ExecElevatedDefaults; // Bootstrap 模式(新增) bootstrapContextMode?: 'full' | 'lightweight'; bootstrapContextRunKind?: 'default' | 'heartbeat' | 'cron'; bootstrapPromptWarningSignaturesSeen?: string[]; bootstrapPromptWarningSignature?: string; // 流式回调(签名有变化) onPartialReply?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>; onAssistantMessageStart?: () => void | Promise<void>; // ← 新增 onBlockReply?: (payload: BlockReplyPayload) => void | Promise<void>; onBlockReplyFlush?: () => void | Promise<void>; blockReplyBreak?: 'text_end' | 'message_end'; // ← 枚举值变更 blockReplyChunking?: BlockReplyChunking; onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>; onReasoningEnd?: () => void | Promise<void>; // ← 新增 onToolResult?: (payload: ReplyPayload) => void | Promise<void>; onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void; // 运行控制 runId: string; timeoutMs: number; abortSignal?: AbortSignal; lane?: string; enqueue?: typeof enqueueCommand; replyOperation?: ReplyOperation; // ← 新增 shouldEmitToolResult?: () => boolean; // ← 新增 shouldEmitToolOutput?: () => boolean; // ← 新增 // 其他 config?: OpenClawConfig; skillsSnapshot?: SkillSnapshot; verboseLevel?: VerboseLevel; extraSystemPrompt?: string; ownerNumbers?: string[]; enforceFinalTag?: boolean; silentExpected?: boolean; // ← 新增 inputProvenance?: InputProvenance; // ← 新增 streamParams?: AgentStreamParams; // ← 新增 internalEvents?: AgentInternalEvent[]; // ← 新增 cleanupBundleMcpOnRunEnd?: boolean; // ← 新增}3.2 EmbeddedPiRunResult
文件: src/agents/pi-embedded-runner/types.ts
type EmbeddedPiRunResult = { // 响应载荷 payloads?: Array<{ text?: string; mediaUrl?: string; mediaUrls?: string[]; replyToId?: string; isError?: boolean; }>; // 元数据 meta: { durationMs: number; agentMeta?: { sessionId: string; provider: string; model: string; usage?: { input?; output?; cacheRead?; cacheWrite?; total? }; lastCallUsage?: { ... }; // ← 新增:最近一次调用 usage promptTokens?: number; // ← 新增 compactionCount?: number; // ← 新增:本轮自动压缩次数 }; aborted?: boolean; systemPromptReport?: SessionSystemPromptReport; error?: { kind: 'context_overflow' | 'compaction_failure' | 'role_ordering' | 'image_size'; message: string; }; // 停止原因(新增语义) stopReason?: string; // 'tool_calls'(clientToolCall)| 'end_turn'(yield)| ... // 待处理 Client Tool 调用(新增) pendingToolCalls?: Array<{ id: string; name: string; arguments: string; }>; }; // 消息工具标志 didSendViaMessagingTool?: boolean; didSendDeterministicApprovalPrompt?: boolean; // ← 新增 messagingToolSentTexts?: string[]; messagingToolSentMediaUrls?: string[]; // ← 新增 messagingToolSentTargets?: MessagingToolSend[]; successfulCronAdds?: SuccessfulCronAdd[]; // ← 新增}3.3 EmbeddedSandboxInfo
文件: src/agents/pi-embedded-runner/types.ts
type EmbeddedSandboxInfo = { enabled: boolean; workspaceDir?: string; // 沙盒工作空间目录 workspaceAccess?: 'none' | 'ro' | 'rw'; agentWorkspaceMount?: string; // Agent 内的挂载路径 browserBridgeUrl?: string; // 浏览器桥接 URL browserNoVncUrl?: string; // noVNC URL hostBrowserAllowed?: boolean; // 允许主机浏览器 elevated?: { allowed: boolean; defaultLevel: 'on' | 'off' | 'ask' | 'full'; };}4. 核心流程(实际实现)

4.1 完整运行流程
用户消息 ↓Gateway WebSocket Server ↓runEmbeddedPiAgent(params) ↓┌─────────────────────────────────────┐│ 0. SessionKey 回填 ││ backfillSessionKey() │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 1. 队列管理 ││ - enqueueSession (会话级) ││ - enqueueGlobal (全局级) │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 2. 工作空间解析 & 插件加载 ││ - resolveRunWorkspaceDir() ││ - ensureRuntimePluginsLoaded() │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 3. Hook 模型选择(新增) ││ - resolveHookModelSelection() ││ - 插件可替换 provider/modelId │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 4. 模型解析 & 验证 ││ - resolveModelAsync() ││ - resolveEffectiveRuntimeModel() │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 5. Context Engine 初始化(新增) ││ - resolveContextEngine() ││ - 统一管理压缩/维护生命周期 │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 6. Auth Controller 准备 ││ - createEmbeddedRunAuthController ││ - initializeAuthProfile() │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 7. 循环尝试(新增多个退出路径) ││ while(true) 上限 MAX_LOOP_ITERS │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 8. 单次运行尝试 ││ runEmbeddedAttempt(params) ││ ├─ 会话写锁 ││ ├─ Bundle MCP/LSP 加载 ││ ├─ 预检溢出检测 ││ ├─ 流式变换链 ││ ├─ sessions_yield 支持 ││ └─ Context Engine 轮次生命周期 │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 9. 错误处理 & 故障转移 ││ - timeout + 高 token → 超时压缩 ││ - context_overflow → CE 压缩 ││ - rate_limit → 轮换/升级降级 ││ - planning_only → act-now 重试 ││ - incomplete_turn → 错误提示 ││ - live_switch → 模型切换信号 ││ - auth_error → 刷新/轮换 ││ - unsupported_thinking → 降级 │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 10. 结果构建 ││ buildEmbeddedRunPayloads() │└─────────────────────────────────────┘ ↓返回 EmbeddedPiRunResult ↓Gateway 响应用户
4.2 流式事件处理流程
createAgentSession() ↓subscribeEmbeddedPiSession(params) ↓┌─────────────────────────────────────┐│ 事件监听器注册 ││ - text_delta ││ - message_start ││ - message_end ││ - tool_calls ││ - tool_result ││ - compaction_start/complete │└─────────────────────────────────────┘ ↓Agent 开始流式输出 ↓┌─────────────────────────────────────┐│ text_delta 事件 ││ deltaBuffer += delta ││ ↓ ││ 检测 <think>/<final> 标签 ││ ↓ ││ 更新 blockState ││ ↓ ││ 推送到 blockChunker ││ ↓ ││ 触发 onBlockReply(chunk) │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ tool_calls 事件 ││ 存储 toolMetas ││ ↓ ││ 执行工具(如 exec、read_file) ││ ↓ ││ 返回工具结果 │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ tool_result 事件 ││ 更新 toolMetas ││ ↓ ││ 检测消息工具 ││ (telegram_send, whatsapp_send) ││ ↓ ││ 去重已发送文本 │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ message_end 事件 ││ finalizeAssistantTexts() ││ ↓ ││ 合并最终文本 ││ ↓ ││ 推送到 assistantTexts │└─────────────────────────────────────┘ ↓返回完整结果
4.3 认证配置文件故障转移
初始配置文件列表 profileCandidates = [profile1, profile2, profile3] ↓┌─────────────────────────────────────┐│ 尝试 profile1 ││ applyApiKeyInfo(profile1) ││ ↓ ││ runEmbeddedAttempt(...) │└─────────────────────────────────────┘ ↓ (失败:速率限制)┌─────────────────────────────────────┐│ 标记失败 ││ markAuthProfileFailure(profile1) ││ → 设置冷却期(30分钟) │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 前进到下一个 ││ advanceAuthProfile() ││ → 跳过冷却中的配置文件 ││ → profileIndex++ │└─────────────────────────────────────┘ ↓┌─────────────────────────────────────┐│ 尝试 profile2 ││ applyApiKeyInfo(profile2) ││ ↓ ││ runEmbeddedAttempt(...) │└─────────────────────────────────────┘ ↓ (成功)┌─────────────────────────────────────┐│ 标记成功 ││ markAuthProfileGood(profile2) ││ markAuthProfileUsed(profile2) ││ → 清除失败计数器 ││ → 更新最后使用时间 │└─────────────────────────────────────┘ ↓返回成功结果4.4 自动压缩流程(Context Engine 版)
runEmbeddedAttempt(...) ↓ (失败:context_overflow 或 timeout + 高 token 占用率)┌─────────────────────────────────────┐│ 超时触发压缩(新路径) ││ tokenUsedRatio > 0.65? ││ → contextEngine.compact({ ││ trigger: "timeout_recovery", ││ force: true ││ }) ││ → 成功 → continue(重试) ││ → 失败 → 转普通超时处理 │└─────────────────────────────────────┘ ↓(上下文溢出)┌─────────────────────────────────────┐│ 溢出压缩(最多 3 次) ││ hadAttemptLevelCompaction? ││ → 是:不再压缩,continue ││ → 否: ││ contextEngine.compact({ ││ trigger: "overflow" ││ }) │└─────────────────────────────────────┘ ↓ (失败)┌─────────────────────────────────────┐│ 工具结果截断 ││ truncateOversizedToolResultsIn ││ Session() ││ → 成功 → continue ││ → 失败 → 返回 context_overflow │└─────────────────────────────────────┘
5. 依赖关系(实际引用)
5.1 外部依赖
核心依赖
• - @mariozechner/pi-coding-agent
(v0.49.3+) • createAgentSession()- 创建 Agent 会话• SessionManager- 会话管理• SettingsManager- 设置管理• DefaultResourceLoader- 含扩展工厂的资源加载器(新增)• codingTools- 基础编码工具• - @mariozechner/pi-ai
• streamSimple()- 流式 API 调用
• - @mariozechner/pi-agent-core
• AgentMessage- 消息类型• AssistantMessage- 助手消息类型
内部依赖
pi-embedded-runner/run.ts ↓ depends onrun/attempt.ts # 单次尝试 ↓ depends onpi-embedded-subscribe.ts # 流式订阅pi-tools.ts # 工具创建pi-embedded-helpers.ts # 辅助函数auth/controller.ts # Auth Controller(新增)context-engine/ # 上下文引擎(新增)session-lock.ts # 会话写锁(新增)bundle-mcp/runtime.ts # Bundle MCP 运行时(新增)model-auth.ts # 模型认证model-selection.ts # 模型选择sandbox.ts # 沙盒管理skills.ts # 技能管理bootstrap-files.ts # Bootstrap 文件5.2 被依赖模块(调用方)
Gateway 层
• src/gateway/server-methods/agent.ts- Agent 方法处理• src/gateway/server-chat.ts- 聊天消息处理
CLI 层
• src/commands/agent/message.ts- CLI 消息命令
自动回复层
• src/auto-reply/reply/agent-run.ts- 自动回复执行
6. 性能优化(实际策略)
6.1 队列管理
双层队列
// 会话级队列(防止同一会话并发)const sessionLane = resolveSessionLane(sessionKey);enqueueSession(() => ...)// 全局级队列(控制全局并发)const globalLane = resolveGlobalLane(lane);enqueueGlobal(() => ...)优点:
• 会话隔离:同一会话串行执行,避免状态冲突 • 全局限流:控制整体负载,防止资源耗尽
6.2 SessionManager 缓存
文件: src/agents/pi-embedded-runner/session-manager-cache.ts
// 预热会话文件prewarmSessionFile(sessionFile) → 提前加载 SessionManager 到内存// 跟踪访问trackSessionManagerAccess(sessionFile) → 记录最后访问时间 → 用于缓存淘汰优点:
• 减少重复文件 I/O • 加快会话加载速度
6.3 认证配置文件冷却
// 失败后设置冷却期markAuthProfileFailure(profileId) → 记录失败时间 + 失败计数 → 冷却期:30 分钟(可配置)// 检查冷却期isProfileInCooldown(authStore, profileId) → 如果在冷却期内,跳过该配置文件优点:
• 避免重复失败 • 快速故障转移到可用配置文件
6.4 流式分块优化
段落优先分块
blockReplyChunking: { paragraphPreference: true, softChunkSize: 500, hardChunkMaxSize: 2000}// 分块器逻辑EmbeddedBlockChunker: → 优先在段落边界分块(\n\n) → 软分块:500 字符(推荐点) → 硬分块:2000 字符(强制点)优点:
• 保持段落完整性 • 平衡响应速度与可读性
6.5 图像处理优化
懒加载 + 尺寸限制
detectAndLoadPromptImages(prompt, workspaceDir) → 仅加载提示中引用的图像 → 检查文件大小限制(MAX_IMAGE_BYTES) → Base64 编码injectHistoryImagesIntoMessages(messages, historyImages) → 去重:避免重复注入 → 仅注入用户消息优点:
• 减少不必要的磁盘 I/O • 控制内存使用
7. 关键实现细节(源码验证)
7.1 Anthropic 拒绝测试 Token 清理
// src/agents/pi-embedded-runner/run.tsconst ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL = "ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL";const ANTHROPIC_MAGIC_STRING_REPLACEMENT = "ANTHROPIC MAGIC STRING TRIGGER REFUSAL (redacted)";function scrubAnthropicRefusalMagic(prompt: string): string { if (!prompt.includes(ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL)) return prompt; return prompt.replaceAll( ANTHROPIC_MAGIC_STRING_TRIGGER_REFUSAL, ANTHROPIC_MAGIC_STRING_REPLACEMENT );}// 使用const prompt = provider === "anthropic" ? scrubAnthropicRefusalMagic(params.prompt) : params.prompt;目的: 防止 Anthropic 测试 Token 污染会话转录
7.2 思考级别降级
// pi-embedded-helpers.tsfunction pickFallbackThinkingLevel(params: { message?: string; attempted: Set<ThinkLevel>;}): ThinkLevel | null { const msg = params.message?.toLowerCase() ?? ""; const isThinkingError = msg.includes("thinking") || msg.includes("extended_thinking"); if (!isThinkingError) return null; // 降级顺序: high → low → off if (!params.attempted.has("low")) return "low"; if (!params.attempted.has("off")) return "off"; return null;}// 使用const fallbackThinking = pickFallbackThinkingLevel({ message: errorText, attempted: attemptedThinking});if (fallbackThinking) { thinkLevel = fallbackThinking; continue; // 重试}支持的思考级别:
• high: 深度推理(如 o1-preview)• low: 轻量推理• off: 无推理
7.3 GitHub Copilot Token 交换
// run.tsif (model.provider === "github-copilot") { const { resolveCopilotApiToken } = await import("../../providers/github-copilot-token.js"); const copilotToken = await resolveCopilotApiToken({ githubToken: apiKeyInfo.apiKey }); authStorage.setRuntimeApiKey( model.provider, copilotToken.token );}特殊处理:
• GitHub Token → Copilot API Token • 动态交换,不持久化
7.4 消息工具去重
// pi-embedded-subscribe.tsconst isMessagingToolDuplicateNormalized( normalized: string, existing: string[]): boolean { return existing.some(prev => prev === normalized);}// 工具结果处理on tool_result: if (isMessagingTool(toolName)) { const normalized = normalizeTextForComparison(text); if (!isMessagingToolDuplicateNormalized( normalized, messagingToolSentTextsNormalized )) { messagingToolSentTexts.push(text); messagingToolSentTextsNormalized.push(normalized); } }目的: 防止消息工具重复发送相同文本
7.5 SessionManager 守卫
// session-tool-result-guard-wrapper.tsfunction guardSessionManager( sessionManager: SessionManager, params: { agentId: string; sessionKey: string; allowSyntheticToolResults: boolean; }): SessionManager { // 包装 appendMessage,拦截合成工具结果 const originalAppendMessage = sessionManager.appendMessage.bind(sessionManager); sessionManager.appendMessage = (message) => { if (message.role === "tool_result" && isSyntheticToolResult(message)) { if (!params.allowSyntheticToolResults) { // 跳过合成工具结果 return; } } originalAppendMessage(message); }; return sessionManager;}目的: 防止合成工具结果污染转录文件
7.6 上下文窗口守卫
// context-window-guard.tsconst CONTEXT_WINDOW_WARN_BELOW_TOKENS = 8_000;const CONTEXT_WINDOW_HARD_MIN_TOKENS = 4_000;const ctxGuard = evaluateContextWindowGuard({ info: ctxInfo, warnBelowTokens: CONTEXT_WINDOW_WARN_BELOW_TOKENS, hardMinTokens: CONTEXT_WINDOW_HARD_MIN_TOKENS});if (ctxGuard.shouldBlock) { throw new FailoverError( `Model context window too small (${ctxGuard.tokens} tokens). ` + `Minimum is ${CONTEXT_WINDOW_HARD_MIN_TOKENS}.`, { reason: "unknown", provider, model: modelId } );}验证层级:
• 警告阈值:8000 tokens(记录日志) • 硬性最小值:4000 tokens(阻止运行)
8. 错误处理与故障转移

8.1 错误分类
// pi-embedded-helpers.tsfunction classifyFailoverReason( message: string): FailoverReason | null { const lower = message.toLowerCase(); // 认证错误 if (lower.includes("unauthorized") || lower.includes("invalid api key")) { return "auth"; } // 速率限制 if (lower.includes("rate limit") || lower.includes("429")) { return "rate_limit"; } // 超时 if (lower.includes("timeout") || lower.includes("timed out")) { return "timeout"; } // 上下文溢出 if (lower.includes("context") && lower.includes("overflow")) { return "context_overflow"; } // 其他 return null;}8.2 故障转移决策树
错误发生 ↓classifyFailoverReason(errorMessage) ↓┌───────────────────────────────────┐│ auth / rate_limit / timeout ││ → markAuthProfileFailure() ││ → advanceAuthProfile() ││ → 重试 │└───────────────────────────────────┘ ↓ (所有配置文件耗尽)┌───────────────────────────────────┐│ fallbackConfigured? ││ Yes → throw FailoverError ││ (触发模型降级) ││ No → throw Error ││ (失败) │└───────────────────────────────────┘ ↓┌───────────────────────────────────┐│ context_overflow ││ → compactEmbeddedPiSession() ││ → 重试 │└───────────────────────────────────┘ ↓┌───────────────────────────────────┐│ unsupported_thinking ││ → pickFallbackThinkingLevel() ││ → 降级 thinkLevel ││ → 重试 │└───────────────────────────────────┘ ↓┌───────────────────────────────────┐│ image_size / image_dimension ││ → 返回用户友好错误 ││ → 不重试 │└───────────────────────────────────┘8.3 FailoverError 特殊处理
// failover-error.tsclass FailoverError extends Error { reason: FailoverReason; provider: string; model: string; profileId?: string; status?: number;}// 使用if (fallbackConfigured && isFailoverErrorMessage(errorText)) { throw new FailoverError(errorText, { reason: assistantFailoverReason ?? "unknown", provider, model: modelId, profileId: lastProfileId, status: resolveFailoverStatus(reason) });}特性:
• 携带详细故障信息 • 触发上层模型降级逻辑 • 保留原始错误上下文
7.7 Sessions Yield 工具(新增)
// attempt.tslet yieldDetected = false;let yieldMessage: string | null = null;// tools: sessions_yield 工具被 invoke 时触发onYield: (message) => { yieldDetected = true; yieldMessage = message; queueSessionsYieldInterruptMessage(activeSession); runAbortController.abort("sessions_yield"); abortSessionForYield?.();}// 检测 yield 中止(非错误)yieldAborted = yieldDetected && isRunnerAbortError(err) && err.cause === "sessions_yield";if (yieldAborted) { aborted = false; // 不视为错误中止 stripSessionsYieldArtifacts(activeSession); await persistSessionsYieldContextMessage(activeSession, yieldMessage);}目的: 允许 Agent 主动让出控制权,保留上下文继续后续轮次
7.8 Planning-Only 检测与重试(新增)
// run.tsconst nextPlanningOnlyRetryInstruction = resolvePlanningOnlyRetryInstruction({ provider, modelId, aborted, timedOut, attempt,});if (!incompleteTurnText && nextPlanningOnlyRetryInstruction && planningOnlyRetryAttempts < 1) { // 发出 Plan 事件通知上层 const planDetails = extractPlanningOnlyPlanDetails(planningOnlyText); if (planDetails) { emitAgentPlanEvent({ runId, data: { phase: "update", ... } }); params.onAgentEvent?.({ stream: "plan", data: { ... } }); } planningOnlyRetryAttempts += 1; planningOnlyRetryInstruction = nextPlanningOnlyRetryInstruction; continue; // 重试,注入 "act-now" 指令}目的: 当 Agent 仅输出规划内容但未执行时,自动注入"立即执行"指令
7.9 预检溢出检测(新增)
// attempt.ts - 在提交 prompt 前检测const preemptiveCompaction = shouldPreemptivelyCompactBeforePrompt({ messages: activeSession.messages, systemPrompt: systemPromptText, prompt: effectivePrompt, contextTokenBudget, reserveTokens,});// route: "truncate_tool_results_only"// → 先截断工具结果,不走压缩if (preemptiveCompaction.route === "truncate_tool_results_only") { truncateOversizedToolResultsInSessionManager(...) preflightRecovery = { route: "truncate_tool_results_only", handled: true } skipPromptSubmission = true; // 跳过提交,触发 preflightRecovery 重试}// route: "compact_only" | "compact_then_truncate"// → 标记 promptError,触发外层压缩逻辑if (preemptiveCompaction.shouldCompact) { preflightRecovery = { route: "compact_only" | "compact_then_truncate" } promptError = new Error(PREEMPTIVE_OVERFLOW_ERROR_TEXT) skipPromptSubmission = true;}目的: 在 prompt 提交前估算 token,主动防止溢出(避免模型报错后再压缩)
文档版本: v2.0 (基于源码验证)\ 验证日期: 2026-04-09
夜雨聆风