拆解 OpenClaw 的底层引擎:从 ReAct 循环到树形会话
本文拆解的是 earendil-works/pi 整个项目,不只
pi-coding-agent一个包。 pi 是 TypeScript monorepo,包含 4 个核心包,本文覆盖pi-agent-core(Agent 运行时)和pi-ai(LLM API 层)的底层实现。
项目定位
obra/superpowers 是纯 prompt 层面的技能系统,而 earendil-works/pi 是真正的 agent 运行时引擎。
OpenClaw(37 万 star)的底层就是 pi——npm 包名 @mariozechner/pi-*。
架构总览
pi 项目是 TypeScript monorepo,包含 4 个核心包:
packages/├── ai/ — @earendil-works/pi-ai (LLM 统一 API 层)├── agent/ — @earendil-works/pi-agent-core (Agent 运行时核心)├── coding-agent/ — @earendil-works/pi-coding-agent (交互式编程 agent CLI)└── tui/ — @earendil-works/pi-tui (终端 UI 库)
┌─────────────────────────────────────────────────┐│ pi-coding-agent (CLI 应用层) ││ - 命令解析、交互式 TUI ││ - 文件/终端工具注册 ││ - SessionManager 持久化 ││ - Compaction 编排 │├─────────────────────────────────────────────────┤│ pi-agent-core (Agent 运行时) ││ ││ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ ││ │ Agent │ │ AgentLoop │ │ Harness │ ││ │ (状态管理) │ │ (工具调用) │ │ (会话/压缩)│ ││ └─────────────┘ └─────────────┘ └───────────┘ ││ ││ ┌─────────────────────────────────────────────┐ ││ │ EventStream (agent_start → message_*) │ ││ │ → tool_execution_* → turn_end → agent_end │ ││ └─────────────────────────────────────────────┘ ││ ┌─────────────────────────────────────────────┐ ││ │ AgentMessage 统一消息模型 │ ││ │ (role: user/assistant/toolResult) │ ││ └─────────────────────────────────────────────┘ │├─────────────────────────────────────────────────┤│ pi-ai (LLM 统一 API 层) ││ ││ ┌─────────────────────────────────────────────┐ ││ │ Model { provider, api, id, cost, context } │ ││ │ Context { systemPrompt, messages, tools } │ ││ │ stream() → AssistantMessageEventStream │ ││ └─────────────────────────────────────────────┘ ││ ││ Provider 注册表: ││ - Anthropic Messages API ││ - OpenAI Responses API / Completions API ││ - Google Vertex / Google Generative ││ - AWS Bedrock ││ - Azure OpenAI Responses ││ - Cloudflare, Mistral, OpenRouter, ... │└─────────────────────────────────────────────────┘
一、Agent 调度:Agent + AgentLoop + Harness 三层模型
1.1 Agent 类:状态管理的门面
Agent 类 (packages/agent/src/agent.ts) 是一个有状态的会话对象,管理着:
classAgent {private_state: MutableAgentState; // systemPrompt + model + tools + messagesprivatereadonly listeners = newSet<...>(); // 事件监听器privatereadonlysteeringQueue: PendingMessageQueue; // steering 消息队列privatereadonlyfollowUpQueue: PendingMessageQueue; // follow-up 消息队列private activeRun?: ActiveRun; // 当前运行状态// 核心 APIprompt(message) // 发起新对话continue() // 从当前转录继续steer(message) // 注入 steering 消息(在下一轮 assistant 响应前处理)followUp(message) // 注入 follow-up 消息(在 agent 停止后继续)abort() // 中止当前运行waitForIdle() // 等待当前运行完成subscribe(listener) // 订阅生命周期事件reset() // 清空所有状态}
关键设计:PendingMessageQueue
classPendingMessageQueue {constructor(publicmode: QueueMode) {} // "all" | "one-at-a-time"drain(): AgentMessage[] {if (this.mode === "all") {const drained = this.messages.slice();this.messages = [];return drained; }// "one-at-a-time": 每次只取出一条const first = this.messages[0];this.messages = this.messages.slice(1);return [first]; }}
QueueMode 决定了 drain() 的行为:
-
"all"— 一次性取出所有消息并清空队列,适合批量处理 -
"one-at-a-time"— 每次只取出一条,其余留在队列中,适合逐条消化
steering queue 和 follow-up queue 默认都用 one-at-a-time,这样 agent 每轮循环只处理一条新消息,不会在一次注入中吃掉所有 pending 消息,保持处理节奏的可控性。两者的区别在于注入时机:
-
steering — 在 agent 运行中注入,内循环每轮开头轮询 -
follow-up — 在 agent 停止后注入,触发外循环重新启动
生命周期事件流:
agent_start → turn_start → message_start (user prompt) → message_end (user prompt) → message_start (assistant streaming) → message_update (delta chunks) → message_end (assistant final) → tool_execution_start → tool_execution_update (partial results) → tool_execution_end → turn_end (with toolResults) → [继续循环或] → agent_end (with all messages)
1.2 AgentLoop:工具调用循环
agent-loop.ts 是整个引擎最核心的部分——ReAct 循环的具体实现。
asyncfunctionrunLoop(currentContext, newMessages, config, signal, emit, streamFn) {let firstTurn = true;let pendingMessages = await config.getSteeringMessages();// 外循环:agent 停止后检查是否有 follow-up 消息while (true) {let hasMoreToolCalls = true;// 内循环:处理工具调用和 steering 消息while (hasMoreToolCalls || pendingMessages.length > 0) {// 1. 注入 pending messagesif (pendingMessages.length > 0) {for (const message of pendingMessages) { currentContext.messages.push(message); } pendingMessages = []; }// 2. 流式获取 assistant 响应const message = awaitstreamAssistantResponse(currentContext, config, signal, emit, streamFn);// 3. 检查是否出错或被中止if (message.stopReason === "error" || message.stopReason === "aborted") {return; }// 4. 执行工具调用const toolCalls = message.content.filter(c => c.type === "toolCall");if (toolCalls.length > 0) {const executedToolBatch = awaitexecuteToolCalls(...); hasMoreToolCalls = !executedToolBatch.terminate; }// 5. 检查是否应该停止if (await config.shouldStopAfterTurn?.(...)) {return; }// 6. 轮询新的 steering 消息 pendingMessages = await config.getSteeringMessages(); }// 7. 检查 follow-up 消息const followUpMessages = await config.getFollowUpMessages();if (followUpMessages.length > 0) { pendingMessages = followUpMessages;continue; // 继续外循环 }break; // 没有更多消息,退出 }}
双循环结构的设计意图:
外循环 (while true) └─ 内循环 (while hasMoreToolCalls || pendingMessages) ├─ 注入 steering 消息 ├─ 调用 LLM 获取回复 ├─ 执行工具调用(并行或串行) ├─ 检查结果是否需要继续 └─ 轮询新 steering 消息 └─ 检查 follow-up 消息 → 有则继续外循环 └─ 没有消息 → agent_end
这个结构完美支持了以下场景:
-
用户在 agent 思考时输入新消息 → steering queue 接收,下一轮注入 -
工具调用完成后需要继续思考 → hasMoreToolCalls = true,内循环继续 -
工具全部执行完 → agent 停止,follow-up queue 检查是否有后续任务
1.2+ Agent 中断与恢复:从哪断的,就从哪续?
执行任务时被用户打断,后续怎么知道从哪里继续?pi 的处理取决于中断发生的时机。
场景一:工具正在执行中(比如 Bash 在跑命令)
user: "帮我跑 1000 个数据文件分析"assistant: [调用 Bash 工具] → Bash: "正在执行中..." ← 用户 Ctrl+C 打断
Agent 通过 AbortSignal 传播中断信号到所有子操作:
classAgent {private activeRun?: ActiveRun;abort() {this.activeRun?.abortController.abort(); // 传播到所有正在运行的子操作 }}
AbortController 会同时终止:
-
正在流式调用的 LLM 请求 -
正在执行的 Bash命令(进程被 kill) -
正在运行的 steering / follow-up 队列(清空)
结果:执行中的工具直接终止,不保存中间状态。 如果要继续,用户需要重新发起请求。pi 没有”记录工具执行到哪一步”的机制。
场景二:Agent 已完成一轮响应,等待用户回复
user: "帮我重构认证模块"assistant: "好的,我先看看现有代码"assistant: [Read auth.ts] ← 读完了assistant: "现有代码是这样的..." ← 用户关掉终端,会话暂停
这时不需要”断点记录”——SessionTree 的叶子节点天然就是断点:
SessionTree 此时存储: ├─ user: "帮我重构认证模块" (e1) └─ assistant: "好的..." (e2) └─ tool: Read(auth.ts) (e3) └─ toolResult: [...] (e4) ← 当前叶子节点(leafId)用户回来后 resume: 1. /resume → 找到这个会话 2. getPathToRoot(leafId) → 加载 e1 → e2 → e3 → e4 的完整上下文 3. 用户说 "继续" 4. prompt("继续") → AgentLoop 从 e4 继续
场景三:压缩后的会话恢复
SessionTree 压缩后: ├─ compaction summary: "用户要求重构认证模块,已读取 auth.ts..." (c1) │ firstKeptEntryId = e2 ← 压缩标记 └─ [e1 被跳过,只保留 c1 摘要 + e2 之后的消息]用户回来继续: → buildSessionContext() 注入 compactionSummary → LLM 虽然没看到 e1 的原始消息,但通过摘要知道上下文 → 正常继续
总结:
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1.3 AgentHarness:会话 + 压缩 + 工具注册的高层封装
AgentHarness 类 (packages/agent/src/harness/agent-harness.ts) 在 Agent 之上提供了更高层的抽象:
classAgentHarness {readonlyagent: Agent; // 底层 Agent 实例readonlyenv: ExecutionEnv; // 执行环境(Node.js、浏览器等)readonlyconversation: ConversationState; // 对话状态readonlyoperation: OperationState; // 操作状态privatesession: Session; // 会话持久化private toolRegistry = newMap<string, AgentTool>(); // 工具注册表private hooks = newMap<...>(); // 事件钩子// 核心功能 - 系统 prompt 模板化 (expandPromptTemplate) - Skill 命令解析 (expandSkillCommand) - 分支摘要生成 (generateBranchSummary) - 会话压缩编排 (compact, prepareCompaction) - 事件钩子 (context, tool_call, tool_result, before_provider_request)}
钩子系统:
// 上下文变换钩子this.agent.transformContext = async (messages, signal) => {const result = awaitthis.emitHook("context", { messages }, signal);return result?.messages ?? messages;};// 工具调用前钩子this.agent.beforeToolCall = async ({ toolCall, args }, signal) => {const result = awaitthis.emitHook("tool_call", { toolCallId, toolName, input }, signal);return result ? { block: result.block, reason: result.reason } : undefined;};// 工具调用后钩子this.agent.afterToolCall = async ({ toolCall, args, result, isError }, signal) => {const patch = awaitthis.emitHook("tool_result", { toolCallId, content, details, isError }, signal);return patch ? { content: patch.content, details: patch.details } : undefined;};// Provider 请求前钩子(可以修改 payload)this.agent.onPayload = async (payload) => {const result = awaitthis.emitHook("before_provider_request", { payload });return result?.payload ?? payload;};// Provider 响应后钩子(观察/修改响应数据)this.agent.onResponse = async (response, signal) => {const result = awaitthis.emitHook("after_provider_response", { response }, signal);return result?.response ?? response;};
钩子是怎么工作的?
1. 注册:AgentHarness 通过 emitHook(hookName, data, signal) 把监听器注册到 Agent 的事件监听器 Set 中2. 触发时机: transformContext → AgentLoop 每次向 LLM 发送请求前 beforeToolCall → LLM 返回了工具调用,但尚未执行 tool_result → 工具执行完成,结果即将返回给 LLM before_provider_request → 构建好 payload,发给 LLM 之前 after_provider_response ← LLM 返回了响应数据3. 修改/拦截:每个钩子返回修改后的数据,Agent 用新数据替代原始数据继续执行 - transformContext:返回修改后的 messages → LLM 看到的上下文被改变了 - beforeToolCall:返回 block: true → 工具不会执行,直接拦截 - tool_result:返回修改后的 content → LLM 看到的工具结果被改变了 - before_provider_request:返回修改后的 payload → 发给 LLM 的请求被改变了 - after_provider_response:返回修改后的 response → 拿到的响应被改变了4. 无监听时:默认走 ?? 路径,返回原始数据,不影响正常流程
核心设计:钩子是”装饰器”而不是”拦截器”。 它不改变 Agent 的执行流程(除了 beforeToolCall 可以阻止执行),而是在关键数据流转的节点上提供修改机会。上层应用可以在此注入逻辑,而不需要改引擎本身的代码。
二、上下文管理:压缩 + 消息清理
2.1 会话存储:SessionTree 模型
pi 的会话不是简单的消息列表,而是一棵树结构:
typeSessionTreeEntry = | MessageEntry// 普通消息 | CustomEntry// 自定义条目 | CustomMessageEntry// 自定义消息 | BranchSummaryEntry// 分支摘要 | CompactionEntry// 压缩条目 | LabelEntry// 标签 | SessionInfoEntry// 会话信息 | ThinkingLevelChangeEntry// 思考级别变更 | ModelChangeEntry// 模型变更
每个条目有 id、parentId,形成树结构。这种设计支持:
-
分支会话:从任意节点分叉,比较不同对话路径 -
压缩回溯:压缩后只保留 firstKeptEntryId之后的消息 -
完整审计:所有变更(模型切换、思考级别)都记录在树中
为什么用树而不是简单的消息列表?
工程师的工作方式本质上是非线性的——试了一个方案不行,回到之前的某个点换另一个方向。列表结构无法表达这种”分叉”:
列表设计(传统聊天): user: 试试方案 A assistant: [方案 A 代码] user: 报错了,试试方案 B ← 方案 A 和 B 混在同一条线上 assistant: [方案 B 代码] ← 如果想回到 A 的基础上改,只能翻回去手动找问题:压缩后谁先谁后丢失?废弃的方案 A 还能不能恢复?两条路径在同一条线上交织,最终变成一团混乱的长对话。
树形设计(pi): user: "帮我重构认证模块" └─ assistant: "好的" ├─ user: "试试方案 A" │ └─ assistant: "这是方案 A..." ← 废弃分支,压缩时只保留摘要 └─ user: "方案 A 不行,试试 B" └─ assistant: "这是方案 B..." ← 当前活跃分支好处:1. 废弃的方案 A 被压缩成摘要,不影响主干 B 的上下文2. 随时可以用 /fork 回到 A 的任意节点拉出新分支3. /tree 命令可视化整棵对话树,支持过滤(只看用户消息、只看有标签的等)4. 压缩时生成分支摘要(branch summary),记录"为什么方案 A 不行"
树形会话的核心价值:它让”试错”变成了有结构的数据,而不是一堆废弃的聊天记录。
2.2 压缩机制:三层策略
compaction.ts 实现了完整的压缩逻辑:
策略一:分支压缩 (branch-summarization) - 将废弃的分支压缩成摘要 - 保留主干,压缩分支内容策略二:会话压缩 (compact) - 当上下文 token 数超过阈值时触发 - 使用 LLM 压缩旧消息 - 生成 compaction summary message - 记录 firstKeptEntryId(保留哪些消息)策略三:文件追踪 - 压缩时记录哪些文件被读取/修改 - 避免压缩丢失文件操作信息
压缩配置:
constDEFAULT_COMPACTION_SETTINGS = {enabled: true,reserveTokens: 16384, // 为回复预留 16K tokenkeepRecentTokens: 20000, // 保留最近 20K token 不压缩};
Token 计算策略:
// 优先使用 provider 返回的 totalTokensfunctioncalculateContextTokens(usage: Usage): number {return usage.totalTokens || usage.input + usage.output + usage.cacheRead + usage.cacheWrite;}// 没有 usage 数据时估算functionestimateTokens(message: AgentMessage): number {// 遍历 message.content,根据类型估算// text: text.length / 4 (英文约 4 chars/token)// images: 固定 token 数}
2.3 消息清理
pi 的消息清理逻辑相对简单,主要集中在:
-
convertToLlm:将 AgentMessage 过滤为 LLM 可接受的 Message 格式 -
角色过滤:只保留 user、assistant、toolResult 角色 -
自定义消息处理:compaction summary、branch summary 等特殊消息的转换
三、多模型支持:统一 API 层 + Provider 注册表
3.1 统一数据模型
// Model:描述一个模型interfaceModel<TApiextendsApi> {id: string;name: string;api: TApi; // "anthropic-messages" | "openai-responses" | ...provider: string; // "anthropic" | "openai" | "google" | ...reasoning: boolean; // 是否支持 reasoning/thinkinginput: string[]; // 支持的输入类型cost: { input: number; output: number; ... };contextWindow: number;maxTokens: number;}// Context:一次对话的完整上下文interfaceContext {systemPrompt: string;messages: Message[]; // 统一消息格式 tools?: Tool[]; // 工具定义}// Message:统一消息格式typeMessage = UserMessage | AssistantMessage | ToolResultMessage;
3.2 Provider 注册机制
// packages/ai/src/providers/register-builtins.tsimport { registerProvider } from"../api-registry.js";import { anthropicMessagesProvider } from"./anthropic.js";import { openaiResponsesProvider } from"./openai-responses.js";import { googleProvider } from"./google.js";import { bedrockProvider } from"./amazon-bedrock.js";// ...registerProvider("anthropic", anthropicMessagesProvider);registerProvider("openai", openaiResponsesProvider);registerProvider("google", googleProvider);registerProvider("bedrock", bedrockProvider);// ...
每个 Provider 实现两个接口:
interfaceProvider {// 流式调用stream(model: Model, context: Context, options: StreamOptions): AssistantMessageEventStream;// 简化流式调用(无高级特性)streamSimple(model: Model, context: Context, options: SimpleStreamOptions): AssistantMessageEventStream;}
3.3 流式事件标准化
所有 provider 的流式输出统一转换为 AssistantMessageEventStream:
// EventStream 抽象classAssistantMessageEventStream {async *(): AsyncIterable<AssistantMessageEvent> {// 事件类型:// - start: 消息开始// - text_start/text_delta/text_end: 文本流// - thinking_start/thinking_delta/thinking_end: 思考流// - toolcall_start/toolcall_delta/toolcall_end: 工具调用流// - done: 完成// - error: 错误 }asyncresult(): Promise<AssistantMessage> {// 返回最终的完整消息 }}
3.4 “Stealth Mode”:模拟 Claude Code
pi 的 Anthropic provider 中有一段有趣的代码:
// Stealth mode: Mimic Claude Code's tool naming exactlyconst claudeCodeVersion = "2.1.75";const claudeCodeTools = ["Read", "Write", "Edit", "Bash", "Grep", "Glob","AskUserQuestion", "EnterPlanMode", "ExitPlanMode","KillShell", "NotebookEdit", "Skill", "Task","TaskOutput", "TodoWrite", "WebFetch", "WebSearch",];// 将工具名转换为 Claude Code 的规范大小写consttoClaudeCodeName = (name: string) => ccToolLookup.get(name.toLowerCase()) ?? name;
这段代码让 pi 的行为与 Claude Code 完全一致——包括工具名的大小写、system prompt 的结构等。
四、工具调用:统一注册 + 参数校验 + 并行/串行执行
4.1 工具定义
interfaceAgentTool<TArgs> {name: string;description: string;parameters: JSONSchema;executionMode: "sequential" | "parallel"; prepareArguments?: (args: Record<string, any>) =>Record<string, any>;execute: (id: string,args: TArgs, signal?: AbortSignal, onUpdate?: (partialResult: ToolResult) => void,) =>Promise<AgentToolResult>;}
4.2 工具执行:三阶段流程
阶段一:Preparation - 查找工具定义 - prepareArguments(参数预处理) - validateToolArguments(参数校验) - beforeToolCall 钩子(可以阻止执行)阶段二:Execution - 并行模式:Promise.all 并发执行 - 串行模式:逐个执行,前一个完成后执行下一个阶段三:Finalization - afterToolCall 钩子(可以修改结果) - 创建 ToolResultMessage - 检查 terminate 标志(是否需要停止循环)
4.3 并行 vs 串行
asyncfunctionexecuteToolCalls(currentContext, assistantMessage, config, signal, emit) {const toolCalls = assistantMessage.content.filter(c => c.type === "toolCall");// 检查是否有工具要求串行执行const hasSequentialToolCall = toolCalls.some(tc => currentContext.tools?.find(t => t.name === tc.name)?.executionMode === "sequential" );if (config.toolExecution === "sequential" || hasSequentialToolCall) {returnexecuteToolCallsSequential(...); }returnexecuteToolCallsParallel(...);}
并行执行的实现细节:
asyncfunctionexecuteToolCallsParallel(...) {// 先发送所有 tool_execution_start 事件for (const toolCall of toolCalls) {awaitemit({ type: "tool_execution_start", toolCallId, toolName, args });const preparation = awaitprepareToolCall(...); finalizedCalls.push(async () => {const executed = awaitexecutePreparedToolCall(preparation, signal, emit);returnawaitfinalizeExecutedToolCall(...); }); }// 并发执行所有工具调用const orderedFinalizedCalls = awaitPromise.all( finalizedCalls.map(entry =>typeof entry === "function" ? entry() : Promise.resolve(entry)) );// 按顺序发送 tool_execution_end 和结果消息for (const finalized of orderedFinalizedCalls) {awaitemitToolExecutionEnd(finalized, emit);awaitemitToolResultMessage(createToolResultMessage(finalized), emit); }}
4.4 工具结果终止符
functionshouldTerminateToolBatch(finalizedCalls): boolean {return finalizedCalls.length > 0 && finalizedCalls.every(finalized => finalized.result.terminate === true);}
如果所有工具都标记 terminate: true,agent 循环停止。这允许工具主动结束对话(比如 Bash 执行了 exit 命令)。
四+、工具从哪来:Skill 体系与 MCP
1. Skill:pi 的一等公民
pi 内建了完整的 skill 体系,这是它区别于其他 agent 框架的重要特征。
Skill 的目录结构:
~/.pi/agent/skills/brave-search/├── SKILL.md # 必需:元数据 + 指令├── scripts/│ └── search.js # 可执行脚本├── references/│ └── api-reference.md # 详细文档,按需加载└── assets/ └── template.json # 模板文件
Skill 的生命周期:
1. 启动扫描 - loadSkillsFromDir() 扫描所有 skill 目录 - 只读取 SKILL.md 的 YAML frontmatter(name + description) - 验证:必须有 name 和 description,否则跳过2. 注入 System Prompt - 将所有 skill 的 name + description 以 XML 格式写入 system prompt - agent 在系统提示中看到了完整的技能列表 - 完整内容(SKILL.md 正文)不加载3. 按需激活 - 当用户请求匹配某个 skill 的描述时,agent 用 Skill 工具调用 - 调用时:完整 SKILL.md 内容加载到对话上下文 - agent 按 SKILL.md 中的指令执行,可以引用 scripts/ 和 references/4. 执行完成 - 工具调用结果返回给 LLM - skill 内容保留在上下文中,直到被压缩
渐进式披露:元数据常驻,完整内容按需加载。
system prompt 中始终加载(几十个字): <skills> <skill name="brave-search" description="用 Brave 搜索网络信息"/> <skill name="github-tool" description="操作 GitHub 仓库"/> </skills>用户说:"帮我搜索最新的 React 19 特性"agent 判断:这是搜索任务 → 调用 Skill 工具激活 brave-search完整 SKILL.md 加载到上下文(约 2000 字): → 搜索参数格式 → 结果解析规则 → 示例
Skill 的作用域(五级):
全局作用域: ~/.pi/agent/skills/ (用户级 skill) ~/.agents/skills/ (兼容 Claude Code)项目作用域: .pi/skills/ (项目级) .agents/skills/ (兼容 Claude Code,从 cwd 向上遍历到 git root)包级别: 项目的 skills/ 目录 package.json 中的 pi.skills 条目设置级别: settings.json 中的 skills 数组CLI 级别: --skill <path> (临时加载)
发现规则:
-
~/.pi/agent/skills/和.pi/skills/:根目录的.md文件也算 skill -
.agents/skills/:忽略根目录的.md文件(避免误加载 Claude Code 的独立 skill 文件) -
名称冲突:保留第一个找到的
禁用模式:
# SKILL.md frontmatterdisable-model-invocation:true
设置后,skill 不会出现在 system prompt 中,只能通过 /skill:name 手动调用。适合敏感操作或需要用户确认的场景。
2. MCP:pi 不处理,由 OpenClaw 负责
pi 本身不包含任何 MCP(Model Context Protocol)相关代码。
pi 的工具体系是通用的 AgentTool 注册机制:
interfaceAgentTool<TArgs> {name: string;description: string;parameters: JSONSchema;execute: (...) =>Promise<AgentToolResult>;}
它只管”怎么调用工具”,不管”工具从哪里来”。工具可以来自:
-
Skill 内置的 scripts/ -
AgentHarness 手动注册 -
上层封装(如 OpenClaw)把 MCP server 的工具包装成 AgentTool
MCP 的完整处理(server 发现、工具 catalog、session 级 runtime 管理)都在 OpenClaw 的封装层 实现:
OpenClaw 封装层(pi 之上):├── pi-bundle-mcp-runtime.ts — MCP server 生命周期管理├── pi-bundle-mcp-tools.ts — MCP 工具发现和注册├── pi-bundle-lsp-runtime.ts — LSP 工具绑定└── tool-split.ts — SDK 工具 vs 本地工具的拆分pi 本身(底层):└── AgentTool 注册表 + execute() — 只管调用
OpenClaw 的 MCP 集成流程:
1. Session 启动时: getOrCreateSessionMcpRuntime(sessionKey) → 发现当前项目配置的所有 MCP server → 按需启动(懒加载) → 构建工具 catalog2. 工具注册: 将 MCP 工具包装成 AgentTool → 注册到 pi 的工具注册表 → pi 不关心这些工具来自 MCP,只当作普通工具调用3. Session 结束时: retireSessionMcpRuntime(sessionKey) → 清理 MCP server 进程 → 释放资源
总结: pi 提供通用的工具调用框架,OpenClaw 在其之上实现了 MCP 协议的支持。这使得 pi 保持简洁——它不需要知道 MCP、LSP 或其他任何协议,只需要一个统一的 AgentTool 接口。
五、失败处理:错误分类 + 重试策略
5.1 Agent 级别的错误处理
privateasynchandleRunFailure(error: unknown, aborted: boolean): Promise<void> {const failureMessage = {role: "assistant",content: [{ type: "text", text: "" }],stopReason: aborted ? "aborted" : "error",errorMessage: error instanceofError ? error.message : String(error), };awaitthis.processEvents({ type: "message_start", message: failureMessage });awaitthis.processEvents({ type: "message_end", message: failureMessage });awaitthis.processEvents({ type: "turn_end", message: failureMessage, toolResults: [] });awaitthis.processEvents({ type: "agent_end", messages: [failureMessage] });}
Agent 级别的错误处理相对简单:将错误包装成一条 assistant 消息,正常发送 agent_end 事件。
真正的错误分类和回退逻辑在 OpenClaw 的封装层实现(上一篇文章分析的 failover-policy.ts 等)。
5.2 工具级别的错误处理
asyncfunctionexecutePreparedToolCall(prepared, signal, emit): Promise<ExecutedToolCallOutcome> {try {const result = await prepared.tool.execute(prepared.toolCall.id, prepared.args, signal, (partialResult) => {emit({ type: "tool_execution_update", toolCallId, toolName, args, partialResult }); });return { result, isError: false }; } catch (error) {return {result: createErrorToolResult(error instanceofError ? error.message : String(error)),isError: true, }; }}
工具执行失败不会中断循环,而是将错误作为 tool result 返回给 LLM,让 LLM 自行决定下一步。
六、设计原则总结
原则一:消息模型统一
AgentMessage 是唯一的消息表示。所有 provider 的差异在边界处转换:
-
输入时: AgentMessage[] → Message[](convertToLlm) -
输出时: Provider response → AgentMessage
原则二:事件驱动
整个 agent 循环通过事件流驱动。AgentLoop 不直接修改状态,而是通过 emit(event) 通知所有监听者。Agent 类订阅这些事件来更新自身状态。
原则三:Hook 系统
AgentHarness 提供了 5 个核心钩子:
-
context— 变换对话上下文 -
tool_call— 拦截工具调用(可以阻止) -
tool_result— 修改工具结果 -
before_provider_request— 修改发给 provider 的 payload -
after_provider_response— 观察 provider 的响应
原则四:树形会话
SessionTree 模型支持分支、压缩、回溯。这比简单的消息列表灵活得多,也是 pi 区别于其他 agent 框架的核心竞争力。
原则五:Skill 内建,MCP 外延
pi 内建了完整的 skill 体系(发现、渐进式披露、五级作用域),但不管 MCP、LSP 等具体协议。pi 提供通用的 AgentTool 接口,由 OpenClaw 在其之上实现 MCP server 的发现和绑定。pi 保持简洁——不需要知道工具从哪里来,只需要知道怎么调用。
原则六:Stealth Mode
pi 刻意模仿 Claude Code 的行为(工具名大小写、system prompt 结构),这样可以用 pi 构建与 Claude Code 行为一致的自定义 agent,同时支持任意 provider 和模型。
七、代码统计
|
|
|
|
|---|---|---|
packages/agent/ |
|
|
packages/ai/ |
|
|
packages/coding-agent/ |
|
|
packages/tui/ |
|
|
总代码量约 160 个源文件(不含测试)。其中:
-
agent.ts— 约 450 行,Agent 状态管理门面 -
agent-loop.ts— 约 400 行,ReAct 循环核心 -
agent-harness.ts— 约 600 行,高层封装(会话、压缩、钩子) -
compaction.ts— 约 500 行,压缩逻辑 -
anthropic.ts— 约 400 行,Anthropic provider -
stream.ts— 约 50 行,流式调用入口
八、总结
拆解完 pi 的源码,最大的感受是:一个好的 agent 引擎不需要复杂的架构,但需要清晰的边界。
pi 只做了五件事:
-
统一消息模型 — 不管底层是 Anthropic、OpenAI 还是 Google,消息进来出去都是同一种格式 -
ReAct 双循环 — 内循环处理工具调用,外循环处理 follow-up,结构简单但覆盖了所有场景 -
树形会话 — 用 JSONL 文件存一棵树,而不是一个列表,分支、回溯、压缩都有了基础 -
钩子系统 — 5 个钩子让上层应用可以拦截和修改任意环节,而不需要改引擎本身 -
Skill 体系 — 渐进式披露的技能加载机制,元数据常驻、完整内容按需激活,五级作用域
剩下的——失败回退、多模型 failover、MCP 绑定、认证轮换、上下文溢出检测——都是 OpenClaw 在这个基础上叠加的。
160 个源文件,几个核心文件加起来不到 3000 行,这就是 OpenClaw 37 万 star 的引擎底座。
夜雨聆风