OpenClaw源码解读系列:Agent 引擎
大年初四,今天继续OpenClaw源码解读——Agent 引擎。这部分是整个OpenClaw项目的核心中的核心,甚至可以单独拿来使用。

概述
Agent 引擎是 OpenClaw 的大脑——当一条消息通过通道进入、经过路由确定目标 Agent 后,就交给这个引擎。它负责:加载会话历史、构建 system prompt、选择模型和认证凭据、发起 API 调用、处理流式响应、调用工具、在上下文溢出时自动压缩会话、在认证失败时轮换凭据、在模型不可用时降级切换。整个引擎由 `src/agents/` 下的 236 个文件组成,核心是一个嵌入式 Pi Agent runner。

一、入口:`pi-embedded.ts`
`src/agents/pi-embedded.ts` 只有 17 行,是一个纯粹的 barrel 导出文件:
export {abortEmbeddedPiRun,compactEmbeddedPiSession,isEmbeddedPiRunActive,isEmbeddedPiRunStreaming,queueEmbeddedPiMessage,resolveEmbeddedSessionLane,runEmbeddedPiAgent,waitForEmbeddedPiRunEnd,} from "./pi-embedded-runner.js";
外部代码(自动回复管线、Gateway 的 `agent` RPC handler)只需导入这个文件。`runEmbeddedPiAgent` 是主入口——传入消息内容、会话信息、模型参数,返回一个包含回复文本和 usage 统计的 `EmbeddedPiRunResult`。
二、主 Runner:`run.ts` 的编排逻辑
`src/agents/pi-embedded-runner/run.ts`(903 行)是引擎的指挥中心。`runEmbeddedPiAgent` 函数的执行流程分为六个阶段。
阶段 1:并发控制——双层 Lane
export async function runEmbeddedPiAgent(params: RunEmbeddedPiAgentParams,): Promise<EmbeddedPiRunResult> {const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);const globalLane = resolveGlobalLane(params.lane);const enqueueSession = (task, opts) => enqueueCommandInLane(sessionLane, task, opts);const enqueueGlobal = (task, opts) => enqueueCommandInLane(globalLane, task, opts);return enqueueSession(() =>enqueueGlobal(async () => {// ... 整个 Agent 运行逻辑}),);}
Agent 运行被包裹在双层队列中。`sessionLane` 确保同一个会话的多条消息串行执行——你不能同时给同一个对话发两条消息让 AI 同时处理,会导致会话历史损坏。`globalLane` 控制全局并发——防止同时发起太多 API 调用导致限流或资源耗尽。
这是一个嵌套入队模式:先进 session 队列排队,排到后再进 global 队列排队,两层都通过后才真正开始执行。
阶段 2:模型解析与上下文窗口守卫
const { model, error, authStorage, modelRegistry } = resolveModel(provider, modelId, agentDir, params.config,);if (!model) throw new Error(error ?? `Unknown model: ${provider}/${modelId}`);const ctxInfo = resolveContextWindowInfo({cfg: params.config, provider, modelId,modelContextWindow: model.contextWindow,defaultTokens: DEFAULT_CONTEXT_TOKENS,});const ctxGuard = evaluateContextWindowGuard({info: ctxInfo,warnBelowTokens: CONTEXT_WINDOW_WARN_BELOW_TOKENS,hardMinTokens: CONTEXT_WINDOW_HARD_MIN_TOKENS,});if (ctxGuard.shouldBlock) {throw new FailoverError(`Model context window too small (${ctxGuard.tokens} tokens).`,{ reason: "unknown", provider, model: modelId },);}
`resolveModel`(`src/agents/pi-embedded-runner/model.ts`,323 行)做了很多工作:
1. 从 agent 目录的 `models.json` 注册表中查找模型定义
2. 如果找不到,回退到 provider 的内联配置
3. 对特定模型应用前向兼容 fallback——比如当用户配置了一个尚未发布的模型(如 `anthropic/opus-4.6`),系统会 fallback 到最接近的已有模型
4. 对完全未知的模型,构建一个泛型模型定义
`evaluateContextWindowGuard` 是一道安全网:如果模型的上下文窗口太小(低于 `CONTEXT_WINDOW_HARD_MIN_TOKENS`),直接抛出 `FailoverError` 阻止执行,避免浪费 API 调用。`FailoverError` 是一个特殊的错误类型——它携带 `reason`、`provider`、`model` 信息,告诉上层”这个模型不行,请尝试下一个”。
阶段 3:认证凭据轮换
const authStore = ensureAuthProfileStore(agentDir, { allowKeychainPrompt: false });const profileOrder = resolveAuthProfileOrder({cfg: params.config, store: authStore, provider,preferredProfile: preferredProfileId,});const profileCandidates = lockedProfileId? [lockedProfileId]: profileOrder.length > 0 ? profileOrder : [undefined];try {while(profileIndex < profileCandidates.length) {const candidate = profileCandidates[profileIndex];if(candidate && isProfileInCooldown(authStore, candidate)) {profileIndex += 1;continue;}await applyApiKeyInfo(profileCandidates[profileIndex]);break;}} catch(err) {const advanced = await advanceAuthProfile();if(!advanced) throwAuthProfileFailover({ allInCooldown: false, error: err });}
OpenClaw 支持为同一个 provider 配置多个 API Key(auth profile)。`resolveAuthProfileOrder` 按优先级排序所有候选 profile。初始化时尝试第一个可用的(跳过处于冷却期的)。如果某个 profile 在运行中出错(限流、欠费等),`advanceAuthProfile` 会切换到下一个候选。
冷却机制(`isProfileInCooldown`)防止短时间内反复尝试已知有问题的 profile。`markAuthProfileFailure` 在失败时标记冷却,`markAuthProfileGood` 在成功时清除。
阶段 4:重试循环——溢出、压缩、降级
const MAX_OVERFLOW_COMPACTION_ATTEMPTS = 3;let overflowCompactionAttempts = 0;let toolResultTruncationAttempted = false;while (true) {const attempt = await runEmbeddedAttempt({ ... });const contextOverflowError = detectContextOverflow(attempt);if (contextOverflowError) {// 策略 1:自动压缩会话if (overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS) {overflowCompactionAttempts++;const compactResult = await compactEmbeddedPiSessionDirect({ ... });if (compactResult.compacted) {continue; // 压缩成功,重试}}// 策略 2:截断超大工具结果if (!toolResultTruncationAttempted && hasOversizedToolResults) {toolResultTruncationAttempted = true;await truncateOversizedToolResultsInSession({ ... });continue; // 截断后重试}// 策略 3:切换 thinking level// 策略 4:切换 auth profile// 策略 5:抛出 FailoverError 让上层切换模型}// 正常完成,构建回复const payloads = buildEmbeddedRunPayloads({ ... });return { payloads, usage: deriveUsageFromAccumulator(usageAccumulator) };}
这个 `while(true)` 循环是引擎的容错核心。每次 `runEmbeddedAttempt` 执行完毕后,系统检查结果。如果遇到上下文溢出(`isLikelyContextOverflowError`),会按优先级尝试五种恢复策略:
1. 自动压缩:调用 `compactEmbeddedPiSessionDirect` 压缩会话历史(最多 3 次)
2. 工具结果截断:如果某个工具的输出特别大(比如 `exec` 返回了几万行日志),截断到合理大小
3. Thinking level 降级:如果当前是 `extended` thinking 模式(占用大量上下文),降级到 `low` 或 `off`
4. Auth profile 轮换:某些限流错误可能是特定 API Key 的问题
5. FailoverError:所有本地恢复手段用尽,抛出 failover 错误,让调用方切换到配置中的 fallback 模型
这种层层降级的设计确保了高可用性——即使主模型出问题、API Key 被限流、会话太长,系统也能自愈。
三、单次 Attempt:`attempt.ts`
`src/agents/pi-embedded-runner/run/attempt.ts`(949 行)执行一次完整的 API 调用。它是引擎中最长的文件之一,按顺序完成以下工作。
沙箱与工作目录
const sandbox = await resolveSandboxContext({config: params.config,sessionKey: sandboxSessionKey,workspaceDir: resolvedWorkspace,});const effectiveWorkspace = sandbox?.enabled? sandbox.workspaceAccess === "rw" ? resolvedWorkspace : sandbox.workspaceDir: resolvedWorkspace;process.chdir(effectiveWorkspace);
如果配置了沙箱(Docker 容器),工具执行会在隔离环境中进行。`effectiveWorkspace` 决定了工具看到的文件系统根目录。
技能加载
const skillEntries = shouldLoadSkillEntries? loadWorkspaceSkillEntries(effectiveWorkspace) : [];restoreSkillEnv = applySkillEnvOverrides({skills: skillEntries, config: params.config,});const skillsPrompt = resolveSkillsPromptForRun({skillsSnapshot: params.skillsSnapshot,entries: skillEntries,config: params.config,workspaceDir: effectiveWorkspace,});
技能(Skills)是 Agent 的可插拔知识模块。每个技能有一个 `SKILL.md` 描述文件和可选的脚本。`loadWorkspaceSkillEntries` 从工作目录扫描技能,`applySkillEnvOverrides` 将技能声明的环境变量注入 `process.env`,`resolveSkillsPromptForRun` 生成技能相关的 system prompt 片段。注意 `restoreSkillEnv` 是一个清理函数——attempt 结束后恢复环境变量,避免污染后续运行。
工具创建
const toolsRaw = params.disableTools? []: createOpenClawCodingTools({exec: { ...params.execOverrides, elevated: params.bashElevated },sandbox,messageProvider: params.messageChannel ?? params.messageProvider,sessionKey: params.sessionKey ?? params.sessionId,agentDir,workspaceDir: effectiveWorkspace,config: params.config,abortSignal: runAbortController.signal,modelProvider: params.model.provider,modelId: params.modelId,// ... 20+ 参数});
`createOpenClawCodingTools`(`src/agents/pi-tools.ts`,457 行)是工具工厂。它根据当前的配置、模型、通道、沙箱状态,动态组装可用的工具集。工具分为几大类:
-
文件操作:`read`、`write`、`edit`、`grep`、`find`、`ls`
-
命令执行:`exec`(shell 命令)、`process`(后台进程管理)
-
补丁:`apply_patch`(仅 OpenAI 系模型)
-
网络:`web_search`(Brave API)、`web_fetch`(URL 内容提取)
-
通道操作:`message`(跨通道发消息)、通道特有的 action(Telegram 贴纸、Discord 表情等)
-
会话管理:`sessions_list`、`sessions_send`、`sessions_spawn`(子 Agent)、`sessions_history`
-
系统:`gateway`(重启/更新)、`cron`(定时任务)、`nodes`(远程设备控制)
-
媒体:`image`(图片分析)、`browser`(浏览器自动化)、`canvas`(画布)
-
记忆:`memory_search`、`memory_store` 等
工具创建过程中会进行策略过滤。`resolveEffectiveToolPolicy` 从四个层级解析工具策略:全局策略 → provider 特定策略 → Agent 策略 → 群聊策略。每个层级可以声明 `allow`(白名单)或 `deny`(黑名单)。此外还有 `ownerOnly` 标记——某些敏感工具(如 `gateway`、`exec`)只有 owner 身份的发送者才能使用。
工具 schema 还要处理 provider 兼容性。比如 Gemini 不支持 `anyOf`/`oneOf` 等 JSON Schema 关键字,`sanitizeToolsForGoogle` 会清理不兼容的 schema 结构。
System Prompt 构建
const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({config: params.config,agentId: sessionAgentId,workspaceDir: effectiveWorkspace,runtime: {host: machineName, os: `${os.type()} ${os.release()}`,model: `${params.provider}/${params.modelId}`,shell: detectRuntimeShell(),channel: runtimeChannel,capabilities: runtimeCapabilities,},});const systemPrompt = buildAgentSystemPrompt({workspaceDir: effectiveWorkspace,defaultThinkLevel: params.thinkLevel,toolNames: tools.map(t => t.name),toolSummaries: coreToolSummaries,skillsPrompt,userTimezone, userTime,runtimeInfo,sandboxInfo,contextFiles,// ... 更多参数});
`buildAgentSystemPrompt`(`src/agents/system-prompt.ts`,652 行)组装完整的 system prompt,包含十几个章节:
-
Tooling:列出所有可用工具及其简介
-
Safety:安全规则
-
Skills:已加载技能的说明
-
Memory:记忆系统使用指南
-
Documentation:OpenClaw 文档路径
-
Workspace:当前工作目录信息
-
Sandbox:沙箱环境描述
-
User Identity:用户信息
-
Time:当前时间和时区
-
Reply Tags:回复格式约定
-
Messaging:跨通道发消息的工具提示
-
Voice (TTS):语音合成提示
-
Reasoning Format:推理标签格式
-
Project Context:bootstrap 文件内容(如 `AGENTS.md`、`CLAUDE.md`)
-
Runtime:运行时环境信息(agent ID、主机名、OS、模型、shell、通道、能力列表)
`promptMode` 参数控制生成哪些章节。子 Agent(subagent)使用 `”minimal”` 模式,只包含核心章节,减少 token 消耗。
会话管理与历史修复
const sessionManager = new SessionManager({ /* ... */ });await repairSessionFileIfNeeded(sessionFile);const messages = await sessionManager.loadHistory();sanitizeToolUseResultPairing(messages); // 修复工具调用/结果不配对validateAnthropicTurns(messages); // Anthropic 要求严格的 user/assistant 交替validateGeminiTurns(messages); // Gemini 有类似的要求
会话历史从 JSONL 文件加载。由于各种异常情况(进程崩溃、并发写入、模型返回格式错误),历史可能损坏。系统有三层修复:
1. 文件级修复:`repairSessionFileIfNeeded` 处理截断的 JSON 行
2. 工具调用配对修复:`sanitizeToolUseResultPairing` 确保每个 `tool_use` 都有对应的 `tool_result`
3. Provider 特定校验:Anthropic 要求严格的 user→assistant 交替,Gemini 有类似约束
执行 API 调用
const prompt = await sessionManager.prompt(params.prompt, {tools: toClientToolDefinitions(tools),images: detectedImages,thinking: thinkingConfig,timeout: params.timeoutMs,abort: combinedAbortSignal,streamParams: params.streamParams,});
`sessionManager.prompt` 是 `@mariozechner/pi-coding-agent` 库的方法,它做三件事:
1. 把用户消息追加到历史
2. 构建完整的 API payload(system prompt + 历史 + 用户消息 + 工具定义)
3. 发起流式 API 调用,处理工具调用循环(模型返回 tool_use → 执行工具 → 将 tool_result 发回模型 → 模型继续生成)
这个循环一直运行到模型返回最终文本(`end_turn`)或被中止。
四、流式事件订阅
`subscribeEmbeddedPiSession`(`src/agents/pi-embedded-subscribe.ts`,637 行)监听 `sessionManager` 的事件流,把底层 API 事件转化为 OpenClaw 的业务事件。
export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionParams) {const state: EmbeddedPiSubscribeState = {assistantTexts: [],toolMetas: [],blockState: { thinking: false, final: false, inlineCode: createInlineCodeState() },deltaBuffer: "",blockBuffer: "",// ... 20+ 状态字段};// ...}
它维护了一个复杂的状态机。核心任务包括:
-
文本流处理:把 AI 的逐 token 输出聚合成有意义的文本块。处理 `<thinking>` 标签(分离推理文本和最终回复),处理 `<final>` 标签(标记最终回复的开始)
-
Block Reply 分块:如果启用了 block streaming(通道支持分块发送),`EmbeddedBlockChunker` 在遇到段落边界时触发 `onBlockReply` 回调,实现”逐段发送”效果
-
工具调用追踪:记录每个工具调用的名称、参数、结果、执行时间,生成 `toolMetas` 供后续 payload 构建使用
-
去重:如果 AI 在文本中重复了工具已经发送过的消息内容(比如 `message` 工具发了一条消息,AI 又在回复中写了相同的文本),自动去重
-
Usage 统计:累积每次 API 调用的 input/output/cache token 使用量
流式事件最终通过 `emitAgentEvent` 推送到 Gateway 的广播系统,Web 控制台和移动端可以实时显示 AI 的”思考过程”和”打字中”状态。
五、会话压缩
当会话历史太长导致上下文溢出时,`compactEmbeddedPiSessionDirect`(`src/agents/pi-embedded-runner/compact.ts`,508 行)执行压缩。
它的流程和 `runEmbeddedAttempt` 非常相似——解析沙箱、加载技能、创建工具、构建 system prompt、初始化 session manager——然后调用 `session.compact(customInstructions)`。
底层的压缩策略由 `@mariozechner/pi-coding-agent` 库实现,通常是让模型自己总结历史记录,把冗长的工具调用/结果对替换为简短的摘要。压缩后重新估算 token 数量,确认确实减小了上下文。
压缩本身也通过 lane 队列串行化——你不能在压缩还没完成时就开始下一次 Agent 运行。
六、Payload 构建:从原始响应到最终回复
`buildEmbeddedRunPayloads`(`src/agents/pi-embedded-runner/run/payloads.ts`,267 行)把 AI 的原始输出转化为可以投递给通道的回复 payload。
它处理的细节包括:
-
格式化错误消息(区分模型错误和工具错误)
-
如果开启了 verbose 模式,内联展示工具调用结果
-
格式化推理文本(如果 reasoning 模式开启)
-
提取 AI 的纯文本回复
-
去除与已发送消息重复的文本
-
解析回复指令(如 `[[reply_to_current]]`、`[[silent]]`)
-
过滤空回复和静默回复
最终返回一个 `ReplyPayload[]` 数组,每个 payload 包含 `text`、`mediaUrls`、`isError`、`replyToId` 等字段,交给通道系统的 `ReplyDispatcher` 投递。
七、完整调用链回顾
一条消息触发 Agent 运行的完整路径:
-
入口:自动回复管线的 `runPreparedReply` → 调用 `runEmbeddedPiAgent`
-
并发控制:session lane 排队 → global lane 排队 → 开始执行
-
准备:解析模型 → 上下文窗口守卫 → 解析认证凭据(支持多 profile 轮换)
-
重试循环:
1. 调用 `runEmbeddedAttempt`(单次 attempt)
2. attempt 内部:解析沙箱 → 加载技能 → 创建工具(策略过滤)→ 构建 system prompt(十几个章节)→ 加载会话历史(三层修复)→ 发起流式 API 调用 → 订阅事件流(分块、去重、usage 统计)
3. 检查结果:上下文溢出?自动压缩 → 工具结果太大?截断 → thinking 模式不支持?降级 → API Key 限流?轮换 → 模型不可用?FailoverError 切换 fallback 模型
4. 成功则构建回复 payload
-
返回:`EmbeddedPiRunResult` 包含 `payloads`(回复内容)、`usage`(token 使用量)、`meta`(模型信息、attempt 次数等)
这条链路中最精妙的设计是多层容错。从最内层的会话历史修复,到 attempt 级别的工具 schema 兼容,到 run 级别的压缩/截断/降级/轮换/failover,再到调用方可配置的 fallback 模型列表——每一层都在尽力自愈,只有在所有本地策略用尽后才向上抛出错误。这让 OpenClaw 在面对千变万化的模型 API 行为时保持高可用性。
下面是讲解项目的基本信息:
-
使用的项目分支是:main
-
commit版本是:f5160ca6becaeeb6a4dfd892fffd2130a696f766
讲解模块如下:
1. CLI 框架与进程模型
2. 配置系统
3. Gateway 核心
4. 通道与路由
5. Agent 引擎(今日讲解)
6. 自动回复管线
7. 插件系统
8. 记忆系统
9. Web 控制台
10. 原生客户端
11. 浏览器自动化
12. 运维与测试
夜雨聆风
