理解openclaw的Pi Monorepo 的packages:agent
为什么要理解openclaw使用的pi-mono
• 因为 openclaw 是基于 pi-coding-agent SDK 的真实集成案例,而 pi-coding-agent 来自 pi-mono。
• agent.ts:Agent 类,面向使用者的 API 层(管理状态、队列、订阅、abort、prompt/continue)。
• agent-loop.ts:执行层(真正跑循环、调 LLM、执行工具、发事件)
理解 openclaw 用到的 pi-mono,核心价值是:
1. 快速定位问题
判断问题在 openclaw 业务层,还是 pi-ai / pi-agent-core / pi-coding-agent 基础层。2. 提高二次开发效率
可以直接复用 pi-mono 已有机制(skills、extensions、modes、storage),避免重复造轮子。3. 更好做性能和稳定性治理
性能瓶颈常在链路中间(流式事件、工具循环、渲染层),理解整条链路才能系统优化。

agent
packages:agent和ai
• 是“分层依赖”关系:packages/agent 构建在 packages/ai 之上。
• packages/ai:底层 AI 基础能力
提供模型/Provider 抽象、streamSimple 流式调用、消息与工具协议、参数校验、token/usage 等。• packages/agent:上层 Agent 编排能力
基于 ai 的能力实现 Agent loop、工具调用调度(串行/并行)、steering/follow-up 队列、状态管理、事件生命周期。
可理解为:
ai = 引擎与协议层
agent = 工作流与控制层
在代码里也能看到 agent 直接从 @mariozechner/pi-ai 导入 streamSimple、Model、Message、validateToolArguments 等。
agent
/media/vdb/code/pi-mono/packages/agent main ⇣138
❯ ll
Permissions Size User Date Modified Name
drwxrwxr-x@ - ctyun 16 3月 16:38 src
drwxrwxr-x@ - ctyun 16 3月 16:38 test
.rw-rw-r--@ 7.3k ctyun 16 3月 16:38 CHANGELOG.md
.rw-rw-r--@ 986 ctyun 16 3月 16:38 package.json
.rw-rw-r--@ 13k ctyun 16 3月 16:38 README.md
.rw-rw-r--@ 209 ctyun 16 3月 16:38 tsconfig.build.json
.rw-rw-r--@ 184 ctyun 16 3月 16:38 vitest.config.ts
/media/vdb/code/pi-mono/packages/agent main ⇣138
❯ cd src
/media/vdb/code/pi-mono/packages/agent/src main ⇣138
❯ ls
agent-loop.ts agent.ts index.ts proxy.ts types.tspackages/agent/src/index.ts
把 agent 包里几个模块重新导出,方便外部只通过一个入口引入:
• ./agent.js:核心 Agent 能力 • ./agent-loop.js:循环/调度相关函数 • ./proxy.js:代理工具函数 • ./types.js:类型定义
实际效果是:调用方可以 import {...} from "agent",不用分别从多个文件路径导入
packages/agent/src/agent.ts
主控制器,实现可直接使用的Agent 运行时封装,把消息状态管理+LLM循环调用+工具执行流程+事件分发
export class Agent {
private _state: AgentState = {
systemPrompt: "",
model: getModel("google", "gemini-2.5-flash-lite-preview-06-17"),
thinkingLevel: "off",
tools: [],
messages: [],
isStreaming: false,
streamMessage: null,
pendingToolCalls: new Set<string>(),
error: undefined,
};实例化 new Agent(opts) 时,主要会做这几件事:
1. 初始化内部状态
用默认 AgentState(默认模型、空消息、空工具、isStreaming=false 等),再合并 opts.initialState。2. 注入可选策略/能力
设置 convertToLlm、transformContext、streamFn、getApiKey、onPayload、beforeToolCall、afterToolCall 等钩子或函数。3. 设置运行参数
包括 sessionId、thinkingBudgets、transport、maxRetryDelayMs、toolExecution。4. 设置队列模式
steeringMode 和 followUpMode(默认都是 "one-at-a-time")。5. 建立事件与控制容器
创建监听器集合、steering/followUp 队列、运行中 Promise/AbortController 等内部字段(初始为空或未激活)
6. 维护 Agent 状态
在 packages/agent/src/agent.ts:116 里定义 Agent,内部维护 messages、tools、isStreaming、pendingToolCalls、error 等状态。7. 提供对外控制接口
在 packages/agent/src/agent.ts:391 提供 prompt() / continue() 发起与续跑;
在 packages/agent/src/agent.ts:310 提供 steer() / followUp() 队列化干预消息;
在 packages/agent/src/agent.ts:373 提供 abort()、reset()。8. 驱动底层 loop
在 packages/agent/src/agent.ts:507 的 _runLoop() 中调用 runAgentLoop / runAgentLoopContinue,并把模型、transport、
thinking、toolExecution、hook(before/after tool call)等配置传下去。9. 处理流式事件并同步状态
在 packages/agent/src/agent.ts:458 的 processLoopEvent() 中接收 message_start/update/end、tool_execution*、agent_end
等事件,更新内部状态并 emit 给订阅者。10. 错误兜底
在 packages/agent/src/agent.ts:573 捕获异常,生成一条 assistant 错误消息写入会话,确保上层可见错误而不是静默失败。
packages/agent/src/agent-loop.ts
是 Agent 的执行引擎,负责把“一次对话回合”真正跑起来
• agent.ts:Agent 类,面向使用者的 API 层(管理状态、队列、订阅、abort、prompt/continue)。 • agent-loop.ts:执行层(真正跑循环、调 LLM、执行工具、发事件)。
调用关系很直接:
1. 调用 new Agent().prompt(...)(在 agent.ts) 2. Agent._runLoop() 组装 context + config 3. Agent._runLoop() 调 runAgentLoop / runAgentLoopContinue(在 agent-loop.ts) 4. agent-loop.ts 持续 emit AgentEvent 5. agent.ts 的 _processLoopEvent() 接收事件并更新 Agent.state,再转发给订阅者

1. 提供两个入口
• agentLoop / runAgentLoop:带新 prompt 开始一轮 • agentLoopContinue / runAgentLoopContinue:基于已有上下文继续跑(重试/续跑)
2. 主循环调度
runLoop() 里反复执行:
• 处理 steering/follow-up 队列消息 • 调 LLM 流式生成 assistant 消息 • 检查是否有 toolCall,有就执行工具 • 发出 turn_end,直到没有后续消息后发 agent_end
3. LLM 边界转换
streamAssistantResponse() 在调用模型前做:
• transformContext(可选) • convertToLlm(AgentMessage[] -> LLM Message[])
然后消费流式事件并转成统一 AgentEvent(message_start/update/end 等)。
4. 工具调用编排
executeToolCalls() 支持 sequential 和 parallel 两种模式,并包含:
• 参数校验 validateToolArguments • beforeToolCall / afterToolCall hook • 工具执行中间更新事件 • 错误结果包装、跳过结果包装(如有用户插入 steering)
5. 事件流输出
通过 emit 和 EventStream 把整个生命周期统一对外广播:
agent_start -> turn_start -> message/tool events -> turn_end -> agent_end。
如何使用agent?
简单示例:最常见的直接用法是:new Agent(...) 后设置模型/工具,然后 prompt()
1. prompt() 发起一轮:prompt() 负责“接收用户输入并发起新回合”,不负责具体推理/工具执行细节 2. subscribe() 接收流式事件 3. setTools() 挂工具 4. steer() / followUp() 在运行中追加指令 5. abort() 取消当前轮 6. continue() 从当前上下文继续跑
import { Agent } from "@mariozechner/pi-agent";
import { getModel } from "@mariozechner/pi-ai";
const agent = new Agent({
initialState: {
model: getModel("openai", "gpt-4.1"),
systemPrompt: "你是一个简洁的技术助手",
},
toolExecution: "parallel", // 或 "sequential"
});
agent.subscribe((e) => {
if (e.type === "message_update") {
// 实时 token / 文本更新
}
if (e.type === "agent_end") {
console.log("done");
}
});
// 注册工具(可选)
agent.setTools([
{
name: "echo",
description: "回显文本",
inputSchema: {
type: "object",
properties: { text: { type: "string" } },
required: ["text"],
},
async execute(_id, args) {
return {
content: [{ type: "text", text: `echo: ${args.text}` }],
details: {},
};
},
},
]);
// 发起对话
await agent.prompt("帮我总结今天的待办");
// 运行中插话(可选)
agent.steer({
role: "user",
content: [{ type: "text", text: "先给结论,再给细节" }],
timestamp: Date.now(),
});
// 续跑(可选)
await agent.continue();
稍微复杂的示例:动态鉴权、上下文裁剪、工具并行执行、前后置 hook、流式事件订阅、steering/follow-up、continue 续跑

• import { Agent, type AgentMessage, type AgentTool } from "@mariozechner/pi-agent";
import { getModel } from "@mariozechner/pi-ai";
/**
* 构造一条标准 user 消息,减少重复模板代码。
*/
function makeMessage(text: string): AgentMessage {
return {
role: "user",
content: [{ type: "text", text }],
timestamp: Date.now(),
};
}
/**
* 示例工具:run_shell
* - 这里只模拟执行,不直接调用真实 shell
* - 实际落地时应接入你自己的执行器,并做命令白名单/沙箱限制
*/
const shellTool: AgentTool<{ cmd: string }> = {
// 工具名:必须和模型返回的 toolCall.name 对齐
name: "run_shell",
// 给模型看的工具描述,影响模型是否选择该工具
description: "Run safe read-only shell commands",
// 工具入参 JSON Schema:用于模型约束和运行前参数校验
inputSchema: {
type: "object",
properties: { cmd: { type: "string" } },
required: ["cmd"],
additionalProperties: false,
},
/**
* 工具执行函数
* @param _toolCallId 当前 tool call 的唯一 ID
* @param args 经过 schema 校验后的参数
* @param signal 取消信号(abort 时触发)
* @param onUpdate 用于向外发送中间进度
*/
async execute(_toolCallId, args, signal, onUpdate) {
// 上报开始状态(会触发 tool_execution_update 事件)
onUpdate?.({ stage: "start", cmd: args.cmd });
// 模拟耗时任务
await new Promise((r) => setTimeout(r, 200));
// 响应取消
if (signal?.aborted) throw new Error("aborted");
// 上报完成状态
onUpdate?.({ stage: "done" });
// 返回工具结果(会被包装成 toolResult 消息)
return {
content: [{ type: "text", text: `command output for: ${args.cmd}` }],
details: { exitCode: 0 },
};
},
};
/**
* Agent 实例:负责管理状态、驱动 loop、执行工具、发事件
*/
const agent = new Agent({
initialState: {
// 模型选择(provider + model id)
model: getModel("openai", "gpt-4.1"),
// 系统提示词:定义角色和输出风格
systemPrompt:
"You are a coding agent. Be precise. Use tools when useful. Return concise technical answers.",
},
// 多工具调用策略:parallel 并行 / sequential 串行
toolExecution: "parallel",
// steer 队列投递策略:每轮只投 1 条
steeringMode: "one-at-a-time",
// follow-up 队列投递策略:一轮可投全部
followUpMode: "all",
// 会话标识:给支持 session cache 的 provider 使用
sessionId: "repo:pi-mono#branch:feature/agent-demo",
// 流传输模式(provider 支持时生效)
transport: "sse",
// 服务端要求重试等待时间上限(毫秒)
maxRetryDelayMs: 20_000,
/**
* 动态取 API key
* 适合短期 token(如 OAuth)按调用实时刷新
*/
getApiKey: async (provider) => {
if (provider === "openai") return process.env.OPENAI_API_KEY;
return undefined;
},
/**
* LLM 调用前的上下文变换
* 这里做简单裁剪:仅保留最近 20 条消息
*/
transformContext: async (messages) => {
const keep = messages.slice(-20);
return keep;
},
/**
* 转换为 LLM 可接受的消息集合
* 过滤掉内部控制类消息,仅保留 user/assistant/toolResult
*/
convertToLlm: async (messages) => {
return messages.filter(
(m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult",
);
},
/**
* 工具执行前钩子
* 可做参数审计、权限检查、阻断危险调用
*/
beforeToolCall: async ({ toolCall, args }) => {
if (toolCall.name !== "run_shell") return;
const cmd = String((args as { cmd: string }).cmd).toLowerCase();
const blocked = ["rm -rf", "shutdown", "reboot"];
if (blocked.some((x) => cmd.includes(x))) {
return { block: true, reason: `blocked command: ${cmd}` };
}
return { block: false };
},
/**
* 工具执行后钩子
* 可统一修饰输出、补充元数据、改写错误语义
*/
afterToolCall: async ({ result, isError }) => {
if (!isError) return;
return {
isError: true,
details: { ...result.details, retriable: false, source: "afterToolCall" },
};
},
/**
* provider payload 观测点
* 便于调试请求结构和参数
*/
onPayload: (payload) => {
console.log("[payload]", JSON.stringify(payload).slice(0, 300));
},
});
// 注册工具列表(会替换当前 tools)
agent.setTools([shellTool]);
/**
* 订阅 Agent 事件流:
* - 消息生命周期
* - 工具执行生命周期
* - turn/agent 结束事件
*/
agent.subscribe((e) => {
switch (e.type) {
case "message_start":
console.log("[message_start]", e.message.role);
break;
case "message_update":
// 流式增量阶段(可用于实时 UI 更新)
break;
case "tool_execution_start":
console.log("[tool_start]", e.toolName, e.args);
break;
case "tool_execution_update":
console.log("[tool_update]", e.partialResult);
break;
case "tool_execution_end":
console.log("[tool_end]", e.toolName, "error=", e.isError);
break;
case "turn_end":
console.log("[turn_end]", e.message.stopReason);
break;
case "agent_end":
console.log("[agent_end] new messages:", e.messages.length);
break;
}
});
/**
* 主流程:
* 1) 首次 prompt
* 2) 插入 steer(中途转向)
* 3) 插入 follow-up(收尾追加任务)
* 4) continue 继续执行
* 5) waitForIdle 等待完全空闲
*/
async function main() {
await agent.prompt([
makeMessage("检查仓库里最近的变更点,先给高层总结。"),
makeMessage("如果需要可以调用 run_shell 获取辅助信息。"),
]);
// 运行中插入“转向”指令(在下一个安全点生效)
agent.steer(makeMessage("先输出风险,再输出建议。"));
// 当前轮结束后追加 follow-up
agent.followUp(makeMessage("把建议改成可执行 checklist。"));
await agent.continue();
// 等待 agent 完全空闲,便于退出前收尾
await agent.waitForIdle();
// 最终错误状态(无错通常为 undefined)
console.log("final error:", agent.state.error);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});

理财咨询的网站地址:http://vi-money.com/lof/overflow-ranking
小说阅读的网站(连载小说和短篇小说):https://024novel.com/
个人博客主页:MakeMoney: https://funkygod.vip/
轻量云主机长期优惠
RackNerd
海外主机显示特惠:只要80元(3TB流量,1vcpu,50GB硬盘)
购买地址:https://my.racknerd.com/aff.php?aff=14942
CloudCone
CloudCone 特惠轻量云主机:购买地址:https://app.cloudcone.com/?ref=12332

📢 腾讯云资源限时福利
有云服务器、CDN、对象存储、网络防护等需求的朋友,欢迎联系下方腾讯云官方销售 (yunhaoxu@tencent.com)👇
1. ✔️ 内部专属折扣,价格更优 2. ✔️ 量大可谈,支持定制方案 3. ✔️ 技术咨询与售后无忧

AI编程套餐

MiniMax:Coding plan
🎁 MiniMax 跨年福利来袭!邀好友享 Coding Plan 双重好礼,助力开发体验!
好友立享 9折 专属优惠 + Builder 权益,你赢返利 + 社区特权!
👉 立即参与:https://platform.minimaxi.com/subscribe/coding-plan?code=5oAzx7O6Sr&source=link

GLM: coding plan
🚀 速来拼好模,智谱 GLM Coding 超值订阅,邀你一起薅羊毛!Claude Code、Cline 等 20+ 大编程工具无缝支持,“码力”全开,越拼越爽!立即开拼,享限时惊喜价!
链接:https://www.bigmodel.cn/glm-coding?ic=RTWWS8HOD6

火山方舟:特惠编程plan
方舟 Coding Plan 支持 Doubao、GLM、DeepSeek、Kimi 等模型,工具不限,现在订阅折上9折,低至8.9元,订阅越多越划算!立即订阅:https://volcengine.com/L/vd1xvW2KKgg/ 邀请码:2DSAD6JL

夜雨聆风