乐于分享
好东西不私藏

OpenClaw源码解读系列:Agent 引擎

OpenClaw源码解读系列:Agent 引擎

大年初四,今天继续OpenClaw源码解读——Agent 引擎。这部分是整个OpenClaw项目的核心中的核心,甚至可以单独拿来使用。

概述

Agent 引擎是 OpenClaw 的大脑——当一条消息通过通道进入、经过路由确定目标 Agent 后,就交给这个引擎。它负责:加载会话历史、构建 system prompt、选择模型和认证凭据、发起 API 调用、处理流式响应、调用工具、在上下文溢出时自动压缩会话、在认证失败时轮换凭据、在模型不可用时降级切换。整个引擎由 `src/agents/` 下的 236 个文件组成,核心是一个嵌入式 Pi Agent runner。

一、入口:`pi-embedded.ts`

`src/agents/pi-embedded.ts` 只有 17 行,是一个纯粹的 barrel 导出文件:

export {  abortEmbeddedPiRun,  compactEmbeddedPiSession,  isEmbeddedPiRunActive,  isEmbeddedPiRunStreaming,  queueEmbeddedPiMessage,  resolveEmbeddedSessionLane,  runEmbeddedPiAgent,  waitForEmbeddedPiRunEnd,from "./pi-embedded-runner.js";

外部代码(自动回复管线、Gateway 的 `agent` RPC handler)只需导入这个文件。`runEmbeddedPiAgent` 是主入口——传入消息内容、会话信息、模型参数,返回一个包含回复文本和 usage 统计的 `EmbeddedPiRunResult`。

二、主 Runner:`run.ts` 的编排逻辑

`src/agents/pi-embedded-runner/run.ts`(903 行)是引擎的指挥中心。`runEmbeddedPiAgent` 函数的执行流程分为六个阶段。

阶段 1:并发控制——双层 Lane

export async function runEmbeddedPiAgent(  params: RunEmbeddedPiAgentParams,): Promise<EmbeddedPiRunResult> {  const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);  const globalLane = resolveGlobalLane(params.lane);  const enqueueSession = (task, opts) => enqueueCommandInLane(sessionLane, task, opts);  const enqueueGlobal = (task, opts) => enqueueCommandInLane(globalLane, task, opts);  return enqueueSession(() =>    enqueueGlobal(async () => {      // ... 整个 Agent 运行逻辑    }),  );}

Agent 运行被包裹在双层队列中。`sessionLane` 确保同一个会话的多条消息串行执行——你不能同时给同一个对话发两条消息让 AI 同时处理,会导致会话历史损坏。`globalLane` 控制全局并发——防止同时发起太多 API 调用导致限流或资源耗尽。

这是一个嵌套入队模式:先进 session 队列排队,排到后再进 global 队列排队,两层都通过后才真正开始执行。

阶段 2:模型解析与上下文窗口守卫

const { model, error, authStorage, modelRegistry } = resolveModel(  provider, modelId, agentDir, params.config,);if (!model) throw new Error(error ?? `Unknown model: ${provider}/${modelId}`);const ctxInfo = resolveContextWindowInfo({  cfg: params.config, provider, modelId,  modelContextWindow: model.contextWindow,  defaultTokensDEFAULT_CONTEXT_TOKENS,});const ctxGuard = evaluateContextWindowGuard({  info: ctxInfo,  warnBelowTokensCONTEXT_WINDOW_WARN_BELOW_TOKENS,  hardMinTokensCONTEXT_WINDOW_HARD_MIN_TOKENS,});if (ctxGuard.shouldBlock) {  throw new FailoverError(    `Model context window too small (${ctxGuard.tokens} tokens).`,    { reason"unknown", provider, model: modelId },  );}

`resolveModel`(`src/agents/pi-embedded-runner/model.ts`,323 行)做了很多工作:

1. 从 agent 目录的 `models.json` 注册表中查找模型定义

2. 如果找不到,回退到 provider 的内联配置

3. 对特定模型应用前向兼容 fallback——比如当用户配置了一个尚未发布的模型(如 `anthropic/opus-4.6`),系统会 fallback 到最接近的已有模型

4. 对完全未知的模型,构建一个泛型模型定义

`evaluateContextWindowGuard` 是一道安全网:如果模型的上下文窗口太小(低于 `CONTEXT_WINDOW_HARD_MIN_TOKENS`),直接抛出 `FailoverError` 阻止执行,避免浪费 API 调用。`FailoverError` 是一个特殊的错误类型——它携带 `reason`、`provider`、`model` 信息,告诉上层”这个模型不行,请尝试下一个”。

阶段 3:认证凭据轮换

const authStore ensureAuthProfileStore(agentDir, { allowKeychainPromptfalse });const profileOrder resolveAuthProfileOrder({  cfg: params.config, store: authStore, provider,  preferredProfile: preferredProfileId,});const profileCandidates = lockedProfileId  ? [lockedProfileId]  : profileOrder.length > 0 ? profileOrder : [undefined];try {  while(profileIndex < profileCandidates.length) {    const candidate = profileCandidates[profileIndex];    if(candidate && isProfileInCooldown(authStore, candidate)) {      profileIndex += 1;      continue;    }    await applyApiKeyInfo(profileCandidates[profileIndex]);    break;  }catch(err) {  const advanced = await advanceAuthProfile();  if(!advanced) throwAuthProfileFailover({ allInCooldownfalse, error: err });}

OpenClaw 支持为同一个 provider 配置多个 API Key(auth profile)。`resolveAuthProfileOrder` 按优先级排序所有候选 profile。初始化时尝试第一个可用的(跳过处于冷却期的)。如果某个 profile 在运行中出错(限流、欠费等),`advanceAuthProfile` 会切换到下一个候选。

冷却机制(`isProfileInCooldown`)防止短时间内反复尝试已知有问题的 profile。`markAuthProfileFailure` 在失败时标记冷却,`markAuthProfileGood` 在成功时清除。

阶段 4:重试循环——溢出、压缩、降级

const MAX_OVERFLOW_COMPACTION_ATTEMPTS = 3;let overflowCompactionAttempts = 0;let toolResultTruncationAttempted = false;while (true) {  const attempt = await runEmbeddedAttempt({ ... });  const contextOverflowError = detectContextOverflow(attempt);  if (contextOverflowError) {    // 策略 1:自动压缩会话    if (overflowCompactionAttempts < MAX_OVERFLOW_COMPACTION_ATTEMPTS) {      overflowCompactionAttempts++;      const compactResult = await compactEmbeddedPiSessionDirect({ ... });      if (compactResult.compacted) {        continue// 压缩成功,重试      }    }    // 策略 2:截断超大工具结果    if (!toolResultTruncationAttempted && hasOversizedToolResults) {      toolResultTruncationAttempted = true;      await truncateOversizedToolResultsInSession({ ... });      continue// 截断后重试    }    // 策略 3:切换 thinking level    // 策略 4:切换 auth profile    // 策略 5:抛出 FailoverError 让上层切换模型  }  // 正常完成,构建回复  const payloads = buildEmbeddedRunPayloads({ ... });  return { payloads, usagederiveUsageFromAccumulator(usageAccumulator) };}

这个 `while(true)` 循环是引擎的容错核心。每次 `runEmbeddedAttempt` 执行完毕后,系统检查结果。如果遇到上下文溢出(`isLikelyContextOverflowError`),会按优先级尝试五种恢复策略:

1. 自动压缩:调用 `compactEmbeddedPiSessionDirect` 压缩会话历史(最多 3 次)

2. 工具结果截断:如果某个工具的输出特别大(比如 `exec` 返回了几万行日志),截断到合理大小

3. Thinking level 降级:如果当前是 `extended` thinking 模式(占用大量上下文),降级到 `low` 或 `off`

4. Auth profile 轮换:某些限流错误可能是特定 API Key 的问题

5. FailoverError:所有本地恢复手段用尽,抛出 failover 错误,让调用方切换到配置中的 fallback 模型

这种层层降级的设计确保了高可用性——即使主模型出问题、API Key 被限流、会话太长,系统也能自愈。

三、单次 Attempt:`attempt.ts`

`src/agents/pi-embedded-runner/run/attempt.ts`(949 行)执行一次完整的 API 调用。它是引擎中最长的文件之一,按顺序完成以下工作。

沙箱与工作目录

const sandbox await resolveSandboxContext({  config: params.config,  sessionKey: sandboxSessionKey,  workspaceDir: resolvedWorkspace,});const effectiveWorkspace = sandbox?.enabled  ? sandbox.workspaceAccess === "rw" ? resolvedWorkspace : sandbox.workspaceDir  : resolvedWorkspace;process.chdir(effectiveWorkspace);

如果配置了沙箱(Docker 容器),工具执行会在隔离环境中进行。`effectiveWorkspace` 决定了工具看到的文件系统根目录。

技能加载

const skillEntries = shouldLoadSkillEntries  ? loadWorkspaceSkillEntries(effectiveWorkspace) : [];restoreSkillEnv = applySkillEnvOverrides({  skills: skillEntries, config: params.config,});const skillsPrompt resolveSkillsPromptForRun({  skillsSnapshot: params.skillsSnapshot,  entries: skillEntries,  config: params.config,  workspaceDir: effectiveWorkspace,});

技能(Skills)是 Agent 的可插拔知识模块。每个技能有一个 `SKILL.md` 描述文件和可选的脚本。`loadWorkspaceSkillEntries` 从工作目录扫描技能,`applySkillEnvOverrides` 将技能声明的环境变量注入 `process.env`,`resolveSkillsPromptForRun` 生成技能相关的 system prompt 片段。注意 `restoreSkillEnv` 是一个清理函数——attempt 结束后恢复环境变量,避免污染后续运行。

工具创建

const toolsRaw = params.disableTools  ? []  : createOpenClawCodingTools({      exec: { ...params.execOverrides, elevated: params.bashElevated },      sandbox,      messageProvider: params.messageChannel ?? params.messageProvider,      sessionKey: params.sessionKey ?? params.sessionId,      agentDir,      workspaceDir: effectiveWorkspace,      config: params.config,      abortSignal: runAbortController.signal,      modelProvider: params.model.provider,      modelId: params.modelId,      // ... 20+ 参数    });

`createOpenClawCodingTools`(`src/agents/pi-tools.ts`,457 行)是工具工厂。它根据当前的配置、模型、通道、沙箱状态,动态组装可用的工具集。工具分为几大类:

  • 文件操作:`read`、`write`、`edit`、`grep`、`find`、`ls`

  • 命令执行:`exec`(shell 命令)、`process`(后台进程管理)

  • 补丁:`apply_patch`(仅 OpenAI 系模型)

  • 网络:`web_search`(Brave API)、`web_fetch`(URL 内容提取)

  • 通道操作:`message`(跨通道发消息)、通道特有的 action(Telegram 贴纸、Discord 表情等)

  • 会话管理:`sessions_list`、`sessions_send`、`sessions_spawn`(子 Agent)、`sessions_history`

  • 系统:`gateway`(重启/更新)、`cron`(定时任务)、`nodes`(远程设备控制)

  • 媒体:`image`(图片分析)、`browser`(浏览器自动化)、`canvas`(画布)

  • 记忆:`memory_search`、`memory_store` 等

工具创建过程中会进行策略过滤。`resolveEffectiveToolPolicy` 从四个层级解析工具策略:全局策略 → provider 特定策略 → Agent 策略 → 群聊策略。每个层级可以声明 `allow`(白名单)或 `deny`(黑名单)。此外还有 `ownerOnly` 标记——某些敏感工具(如 `gateway`、`exec`)只有 owner 身份的发送者才能使用。

工具 schema 还要处理 provider 兼容性。比如 Gemini 不支持 `anyOf`/`oneOf` 等 JSON Schema 关键字,`sanitizeToolsForGoogle` 会清理不兼容的 schema 结构。

System Prompt 构建

const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({  config: params.config,  agentId: sessionAgentId,  workspaceDir: effectiveWorkspace,  runtime: {    host: machineName, os: `${os.type()} ${os.release()}`,    model: `${params.provider}/${params.modelId}`,    shelldetectRuntimeShell(),    channel: runtimeChannel,    capabilities: runtimeCapabilities,  },});const systemPrompt buildAgentSystemPrompt({  workspaceDir: effectiveWorkspace,  defaultThinkLevel: params.thinkLevel,  toolNames: tools.map(t => t.name),  toolSummaries: coreToolSummaries,  skillsPrompt,  userTimezone, userTime,  runtimeInfo,  sandboxInfo,  contextFiles,  // ... 更多参数});

`buildAgentSystemPrompt`(`src/agents/system-prompt.ts`,652 行)组装完整的 system prompt,包含十几个章节:

  • Tooling:列出所有可用工具及其简介

  • Safety:安全规则

  • Skills:已加载技能的说明

  • Memory:记忆系统使用指南

  • Documentation:OpenClaw 文档路径

  • Workspace:当前工作目录信息

  • Sandbox:沙箱环境描述

  • User Identity:用户信息

  • Time:当前时间和时区

  • Reply Tags:回复格式约定

  • Messaging:跨通道发消息的工具提示

  • Voice (TTS):语音合成提示

  • Reasoning Format:推理标签格式

  • Project Context:bootstrap 文件内容(如 `AGENTS.md`、`CLAUDE.md`)

  • Runtime:运行时环境信息(agent ID、主机名、OS、模型、shell、通道、能力列表)

`promptMode` 参数控制生成哪些章节。子 Agent(subagent)使用 `”minimal”` 模式,只包含核心章节,减少 token 消耗。

会话管理与历史修复

const sessionManager = new SessionManager({ /* ... */ });await repairSessionFileIfNeeded(sessionFile);const messages = await sessionManager.loadHistory();sanitizeToolUseResultPairing(messages); // 修复工具调用/结果不配对validateAnthropicTurns(messages);       // Anthropic 要求严格的 user/assistant 交替validateGeminiTurns(messages);          // Gemini 有类似的要求

会话历史从 JSONL 文件加载。由于各种异常情况(进程崩溃、并发写入、模型返回格式错误),历史可能损坏。系统有三层修复:

1. 文件级修复:`repairSessionFileIfNeeded` 处理截断的 JSON 行

2. 工具调用配对修复:`sanitizeToolUseResultPairing` 确保每个 `tool_use` 都有对应的 `tool_result`

3. Provider 特定校验:Anthropic 要求严格的 user→assistant 交替,Gemini 有类似约束

执行 API 调用

const prompt = await sessionManager.prompt(params.prompt, {  toolstoClientToolDefinitions(tools),  images: detectedImages,  thinking: thinkingConfig,  timeout: params.timeoutMs,  abort: combinedAbortSignal,  streamParams: params.streamParams,});

`sessionManager.prompt` 是 `@mariozechner/pi-coding-agent` 库的方法,它做三件事:

1. 把用户消息追加到历史

2. 构建完整的 API payload(system prompt + 历史 + 用户消息 + 工具定义)

3. 发起流式 API 调用,处理工具调用循环(模型返回 tool_use → 执行工具 → 将 tool_result 发回模型 → 模型继续生成)

这个循环一直运行到模型返回最终文本(`end_turn`)或被中止。

四、流式事件订阅

`subscribeEmbeddedPiSession`(`src/agents/pi-embedded-subscribe.ts`,637 行)监听 `sessionManager` 的事件流,把底层 API 事件转化为 OpenClaw 的业务事件。

export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionParams) {  const stateEmbeddedPiSubscribeState = {    assistantTexts: [],    toolMetas: [],    blockState: { thinkingfalsefinalfalseinlineCodecreateInlineCodeState() },    deltaBuffer"",    blockBuffer"",    // ... 20+ 状态字段  };  // ...}

它维护了一个复杂的状态机。核心任务包括:

  • 文本流处理:把 AI 的逐 token 输出聚合成有意义的文本块。处理 `<thinking>` 标签(分离推理文本和最终回复),处理 `<final>` 标签(标记最终回复的开始)

  • Block Reply 分块:如果启用了 block streaming(通道支持分块发送),`EmbeddedBlockChunker` 在遇到段落边界时触发 `onBlockReply` 回调,实现”逐段发送”效果

  • 工具调用追踪:记录每个工具调用的名称、参数、结果、执行时间,生成 `toolMetas` 供后续 payload 构建使用

  • 去重:如果 AI 在文本中重复了工具已经发送过的消息内容(比如 `message` 工具发了一条消息,AI 又在回复中写了相同的文本),自动去重

  • Usage 统计:累积每次 API 调用的 input/output/cache token 使用量

流式事件最终通过 `emitAgentEvent` 推送到 Gateway 的广播系统,Web 控制台和移动端可以实时显示 AI 的”思考过程”和”打字中”状态。

五、会话压缩

当会话历史太长导致上下文溢出时,`compactEmbeddedPiSessionDirect`(`src/agents/pi-embedded-runner/compact.ts`,508 行)执行压缩。

它的流程和 `runEmbeddedAttempt` 非常相似——解析沙箱、加载技能、创建工具、构建 system prompt、初始化 session manager——然后调用 `session.compact(customInstructions)`。

底层的压缩策略由 `@mariozechner/pi-coding-agent` 库实现,通常是让模型自己总结历史记录,把冗长的工具调用/结果对替换为简短的摘要。压缩后重新估算 token 数量,确认确实减小了上下文。

压缩本身也通过 lane 队列串行化——你不能在压缩还没完成时就开始下一次 Agent 运行。

六、Payload 构建:从原始响应到最终回复

`buildEmbeddedRunPayloads`(`src/agents/pi-embedded-runner/run/payloads.ts`,267 行)把 AI 的原始输出转化为可以投递给通道的回复 payload。

它处理的细节包括:

  • 格式化错误消息(区分模型错误和工具错误)

  • 如果开启了 verbose 模式,内联展示工具调用结果

  • 格式化推理文本(如果 reasoning 模式开启)

  • 提取 AI 的纯文本回复

  • 去除与已发送消息重复的文本

  • 解析回复指令(如 `[[reply_to_current]]`、`[[silent]]`)

  • 过滤空回复和静默回复

最终返回一个 `ReplyPayload[]` 数组,每个 payload 包含 `text`、`mediaUrls`、`isError`、`replyToId` 等字段,交给通道系统的 `ReplyDispatcher` 投递。

七、完整调用链回顾

一条消息触发 Agent 运行的完整路径:

  • 入口:自动回复管线的 `runPreparedReply` → 调用 `runEmbeddedPiAgent`

  • 并发控制:session lane 排队 → global lane 排队 → 开始执行

  • 准备:解析模型 → 上下文窗口守卫 → 解析认证凭据(支持多 profile 轮换)

  • 重试循环

1. 调用 `runEmbeddedAttempt`(单次 attempt)

2. attempt 内部:解析沙箱 → 加载技能 → 创建工具(策略过滤)→ 构建 system prompt(十几个章节)→ 加载会话历史(三层修复)→ 发起流式 API 调用 → 订阅事件流(分块、去重、usage 统计)

3. 检查结果:上下文溢出?自动压缩 → 工具结果太大?截断 → thinking 模式不支持?降级 → API Key 限流?轮换 → 模型不可用?FailoverError 切换 fallback 模型

4. 成功则构建回复 payload

  • 返回:`EmbeddedPiRunResult` 包含 `payloads`(回复内容)、`usage`(token 使用量)、`meta`(模型信息、attempt 次数等)

这条链路中最精妙的设计是多层容错。从最内层的会话历史修复,到 attempt 级别的工具 schema 兼容,到 run 级别的压缩/截断/降级/轮换/failover,再到调用方可配置的 fallback 模型列表——每一层都在尽力自愈,只有在所有本地策略用尽后才向上抛出错误。这让 OpenClaw 在面对千变万化的模型 API 行为时保持高可用性。


下面是讲解项目的基本信息:

  • 使用的项目分支是:main

  • commit版本是:f5160ca6becaeeb6a4dfd892fffd2130a696f766

讲解模块如下:

1. CLI 框架与进程模型

2. 配置系统

3. Gateway 核心

4. 通道与路由

5. Agent 引擎(今日讲解)

6. 自动回复管线

7. 插件系统

8. 记忆系统

9. Web 控制台

10. 原生客户端

11. 浏览器自动化

12. 运维与测试

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » OpenClaw源码解读系列:Agent 引擎

评论 抢沙发

8 + 7 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮