OpenClaw 源码学习-Quick Start

还是老规矩,先从 QuickStart 开始。
Quick Start
import { Agent } from "@mariozechner/pi-agent-core";import { getModel } from "@mariozechner/pi-ai";const agent = new Agent({initialState: {systemPrompt: "You are a helpful assistant.",model: getModel("anthropic", "claude-sonnet-4-20250514"),},});agent.subscribe((event) => {if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {// Stream just the new text chunkprocess.stdout.write(event.assistantMessageEvent.delta);}});await agent.prompt("Hello!");
这个模块比较简单,从 quickstart 的逻辑就可以看出,其主要工作是封装了 agent。
agent 是什么,可以理解为带有了记忆、工具的大模型。大模型本省没有记忆和工具能力,agent 正好是对其能力的补充。
types.ts
首先理解数据模型,理解这个包的全部类型定义,这是读懂后续代码的前提。
1. StreamFn
export type StreamFn = (...args: Parameters<typeof streamSimple>) => ReturnType<typeof streamSimple> | Promise<ReturnType<typeof streamSimple>>;
流式调用函数:可同步返回,也可返回 Promise,这是 LLM 调用的抽象签名。
这里用 `Parameters` 和 `ReturnType` 两个 TypeScript 内置工具类型分别提取 `streamSimple` 的**参数元组**和**返回值类型**。
拆解一下:
// streamSimple 的签名大致是:function streamSimple(model: Model<any>,context: Context,options: SimpleStreamOptions): EventStream<AssistantMessageEvent, AssistantMessage>// 所以:Parameters<typeof streamSimple> // → [Model<any>, Context, SimpleStreamOptions]ReturnType<typeof streamSimple> // → EventStream<AssistantMessageEvent, AssistantMessage>
最终展开为:
export type StreamFn = (model: Model<any>,context: Context,options: SimpleStreamOptions) => EventStream<AssistantMessageEvent, AssistantMessage>| Promise<EventStream<AssistantMessageEvent, AssistantMessage>>;
这样做的好处是不用手动同步签名——如果 streamSimple 将来加了参数或改了返回类型,StreamFn 自动跟着变。
2. AgentLoopConfig:agent-loop 的运行配置。
-
model:LLM 大模型
-
convertToLlm : 在每次 LLM 调用之前,将 `AgentMessage[]` 转换为底层提供商可理解的 `Message[]`。每条 `AgentMessage` 都需要被映射为 LLM 可理解的 `UserMessage` / `AssistantMessage` / `ToolResultMessage` 之一。无法转换的消息(例如仅用于 UI 的通知、状态消息等)应该在此处被过滤掉。
-
transformContext:在 `convertToLlm` 之前对上下文做一次可选转换。适合做“以 AgentMessage 为单位”的操作,例如:上下文窗口管理(裁剪旧消息)或者注入来自外部的数据/上下文。
-
getApiKey:为指定 provider 动态解析 API key(每次 LLM 调用都会触发)。适合短期 OAuth token(例如 GitHub Copilot)可能在长时间工具执行期间过期的场景。
-
getSteeringMessages:返回需要在运行过程中注入的 steering 消息。loop 会在每次工具执行之后调用它,用于检查用户是否需要“插话/打断”。如果返回了消息,剩余工具调用会被跳过,并在下一次 LLM 调用前把这些消息加入上下文。
-
getFollowUpMessages:返回需要在 agent 本应结束时继续处理的 follow-up 消息。当 agent 没有更多工具调用且没有 steering 消息时会调用。如果返回了消息,会将其加入上下文并继续下一轮对话。
-
CustomAgentMessages:可扩展的自定义消息接口。
export interface CustomAgentMessages {}export type AgentMessage = Message | CustomAgentMessages[keyof CustomAgentMessages];
`AgentMessage`:LLM 消息 + 自定义消息的联合类型。该抽象允许应用添加自定义消息类型,同时保持类型安全并兼容基础 LLM 消息。
3. AgentState:Agent 的状态:包含配置与对话数据,一个非常重要的数据结构
export interface AgentState {systemPrompt: string;model: Model<any>;thinkingLevel: ThinkingLevel;tools: AgentTool<any>[];messages: AgentMessage[];isStreaming: boolean;streamMessage: AgentMessage | null;pendingToolCalls: Set<string>;error?: string;}
4. AgentTool:AgentTool 在 Tool 基础上增加 execute 能力
export interface AgentTool<TParameters extends TSchema = TSchema, TDetails = any> extends Tool<TParameters> {label: string; // 用于 UI 展示的工具名称/标签execute: (toolCallId: string,params: Static<TParameters>,signal?: AbortSignal,onUpdate?: AgentToolUpdateCallback<TDetails>,) => Promise<AgentToolResult<TDetails>>;}
5. AgentContext: AgentContext 类似于 pi-ai 的 Context,但 tools 使用 AgentTool
export interface AgentContext {systemPrompt: string;messages: AgentMessage[];tools?: AgentTool<any>[];}
6. AgentEvent:Agent 对外发出的事件(常用于 UI 更新)。这些事件提供了消息、轮次以及工具执行的细粒度生命周期信息。四种生命周期的事件联合类型,共 11 种事件。
export type AgentEvent =// Agent lifecycle| { type: "agent_start" }| { type: "agent_end"; messages: AgentMessage[] }// Turn lifecycle - a turn is one assistant response + any tool calls/results| { type: "turn_start" }| { type: "turn_end"; message: AgentMessage; toolResults: ToolResultMessage[] }// Message lifecycle - emitted for user, assistant, and toolResult messages| { type: "message_start"; message: AgentMessage }// Only emitted for assistant messages during streaming| { type: "message_update"; message: AgentMessage; assistantMessageEvent: AssistantMessageEvent }| { type: "message_end"; message: AgentMessage }// Tool execution lifecycle| { type: "tool_execution_start"; toolCallId: string; toolName: string; args: any }| { type: "tool_execution_update"; toolCallId: string; toolName: string; args: any; partialResult: any }| { type: "tool_execution_end"; toolCallId: string; toolName: string; result: any; isError: boolean };
agent-loop.ts
1. `agentLoop` 和 `agentLoopContinue`:
export function agentLoop(prompts: AgentMessage[],context: AgentContext,config: AgentLoopConfig,signal?: AbortSignal,streamFn?: StreamFn,): EventStream<AgentEvent, AgentMessage[]>
和
export function agentLoopContinue(context: AgentContext,config: AgentLoopConfig,signal?: AbortSignal,streamFn?: StreamFn,): EventStream<AgentEvent, AgentMessage[]>
两者的区别在于是否插入新消息。
可以理解为前者是 LLM 发起一个新对话,后者是 LLM 调用失败,针对上线问重试,二者共享`runLoop`。
2. createAgentStream:建了一个以 `agent_end` 为终止信号的异步事件流,终止时返回本轮产生的所有新消息。
`EventStream<T, R>` 是一个通用的异步事件流,两个泛型参数:
-
`T` — 事件类型(这里就是 `AgentEvent`)
-
R` — 最终结果类型(这里就是 `AgentMessage[]`)
关于 EventStream 是如何运行的:
-
push() 时发生了什么?当 `runLoop` 中调用 `stream.push(event)` 时:
-
检查 `isComplete(event)` — 如果是 `agent_end`,标记 `done = true`,将 `event.messages` 存入 `finalResultPromise`
-
有消费者在等(`waiting` 数组非空)→ 直接交付
-
没人等 → 放入 `queue` 缓冲
-
async *[Symbol.asyncIterator](): AsyncIterator<T> 是如何运行的?在消费端调用如下:
forawait (constevent of stream) { ... }
`for await` 调用 `[Symbol.asyncIterator]()`:
-
`queue` 有数据 → `yield` 出队
-
`done` → `return`,迭代结束
-
都没有 → 创建 Promise 挂起等待,直到 `push()` 或 `end()` 唤醒
-
result: 可以在迭代之前或之后调用,`await stream.result()` 拿到 `agent_end` 时的 `messages`。`agent.ts` 中没直接用这个方法,但 `streamAssistantResponse()` 中用了 LLM 层的 `response.result()`。
3. runLoop: 是整个 agent 的计算核心,用双层循环实现了 **LLM 响应 → 工具执行 → 检查中断/追加 → 继续或结束** 的完整生命周期。
while(true) { // 外层:follow-up 循环while(hasMoreToolCalls || pendingMessages.length > 0) { // 内层:工具+steering 循环① 注入 pendingMessages,这些消息可能是 steering 打断消息,也可能是 follow-up 追加消息。注入后清空。② streamAssistantResponse() ← 调 LLM,这里会做 transformContext → convertToLlm → 调用 LLM → 流式输出事件。③ 检查 stopReason → error/aborted 则退出,LLM 出错或被 abort,立即结束,不执行工具。④ 有工具调用?→ executeToolCalls(),逐个执行工具,每个执行后检查 steering。如果有 steering,剩余工具被跳过,返回 steeringMessages。工具结果会加入到上下文⑤ turn_end⑥ 检查 steering → 设为 pendingMessages,下一轮内层循环开头注入。}检查 follow-up → 有则设为 pendingMessages,continue无 follow-up → break}agent_end
内存循环:没有工具调用且没有待注入消息就结束。
外层循环:每次 agent “本来要停了” 的时候,再检查一次 follow-up 队列。有消息就继续干,没有就结束。
agent.ts
主要是对 agent-loop 的封装,增加了状态管理、队列和事件订阅。
1. defaultConvertToLlm, 默认的消息过滤,只保留`user` `assistant` 和 `toolResult` 消息。
function defaultConvertToLlm(messages: AgentMessage[]): Message[] {return messages.filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult");}
2. AgentOptions
对比 `AgentLoopConfig`,增加了:
-
`initialState`: 初始状态(浅合并)
-
`steeringMode` / `followUpMode`: 队列出队模式
-
`streamFn`: 自定义流函数
-
`sessionId`、`getApiKey`、`thinkingBudgets`、`transport`、`maxRetryDelayMs`
3. Agent类是 `agent-loop` 的**有状态封装**,面向应用层使用。它把无状态的函数式 API 包装成易用的面向对象接口,管理状态、事件订阅和消息队列。
-
状态:
private _state: AgentState = {systemPrompt: "",model: getModel("google", "gemini-2.5-flash-lite-preview-06-17"), // 默认模型thinkingLevel: "off",tools: [],messages: [],isStreaming: false,streamMessage: null, // 流式过程中的部分消息,供 UI 实时读取pendingToolCalls: new Set(), // 正在执行的工具 ID,供 UI 显示进度error: undefined,};
-
队列:
private steeringQueue: AgentMessage[] = []; // 打断消息队列private followUpQueue: AgentMessage[] = []; // 追加任务队列private steeringMode: "all" | "one-at-a-time";private followUpMode: "all" | "one-at-a-time";
-
运行时控制:
private abortController?: AbortController; // 中断当前请求private runningPrompt?: Promise<void>; // 当前是否在运行private resolveRunningPrompt?: () => void; // resolve runningPrompt 的引用
-
配置:
private convertToLlm: ...; // 消息转换函数private transformContext?: ...; // 上下文转换函数public streamFn: StreamFn; // LLM 调用函数(public,允许外部替换)private _sessionId?: string; // 会话 IDpublic getApiKey?: ...; // 动态 API key(public,方便外部访问)private _onPayload?: ...; // 请求拦截private _thinkingBudgets?: ...; // 思考预算private _transport: Transport; // 传输方式private _maxRetryDelayMs?: number;// 最大重试等待时间
-
构造函数:
constructor(opts: AgentOptions = {}) {this._state = { ...this._state, ...opts.initialState }; // 浅合并初始状态this.convertToLlm = opts.convertToLlm || defaultConvertToLlm;this.transformContext = opts.transformContext;this.steeringMode = opts.steeringMode || "one-at-a-time"; // 默认逐条出队this.followUpMode = opts.followUpMode || "one-at-a-time";this.streamFn = opts.streamFn || streamSimple; // 默认直连 LLMthis._transport = opts.transport ?? "sse"; // 默认 SSE...}
其中关键点:
-
`initialState` 是**浅合并**,只覆盖提供的字段
-
`convertToLlm` 默认只保留 `user`/`assistant`/`toolResult` 三种角色
-
`streamFn` 默认 `streamSimple`,浏览器场景替换为 `streamProxy`
-
状态的修改方法
全是检点的赋值操作,主要有配置修改、消息管理和队列操作。
两个`dequeue` 方法的逻辑相同,只是操作的不同的队列。
dequeueSteeringMessages() {if (mode === "one-at-a-time") {// 取第一个,其余保留const first = this.steeringQueue[0];this.steeringQueue = this.steeringQueue.slice(1);return [first];}// "all" 模式:全部取出const steering = this.steeringQueue.slice();this.steeringQueue = [];return steering;}
-
prompt 发起对话,流程如下:
prompt("读一下 config.json")│├─ 检查 isStreaming → 抛错(冲突保护)├─ 检查 model → 抛错│├─ 构造 AgentMessage:│ ├─ string → { role: "user", content: [{ type: "text", text: input }], timestamp }│ ├─ AgentMessage → [input]│ └─ AgentMessage[] → input│└─ _runLoop(msgs)
参数类型是一个联合型的,对于字符串输入会被包装为标准 user 消息,`images` 会追加到 `content` 数组中(与 text 并列)。其他类型是 AgentMessage 。
-
continue 继续对话,其流程如下:
continue()│├─ 检查 isStreaming → 抛错├─ 检查 messages 为空 → 抛错│├─ 末尾消息 role === "assistant"?│ ├─ 有 steering 消息?│ │ └─ _runLoop(steering, { skipInitialSteeringPoll: true })│ │ ↑ skipInitialSteeringPoll 避免 runLoop 入口重复消费 steering│ ├─ 有 followUp 消息?│ │ └─ _runLoop(followUp)│ └─ 都没有 → 抛错 "Cannot continue from message role: assistant"│└─ 末尾是 user 或 toolResult└─ _runLoop(undefined) ← 不传入新消息,直接继续
-
_runLoop 核心运行方法,这是 Agent 类最关键的方法,**桥接 Agent 状态和 agentLoop 函数**。
初始化阶段:
-
创建 running promise(用于 waitForIdle)
-
运行时状态
-
快照当前状态(防止运行期间状态被外部修改影响)
-
构建配置
-
特殊注意点:`messages` 用 `.slice()` 做了拷贝。agentLoop 操作的是 `currentContext`(拷贝),但 `message_end` 事件触发 `appendMessage()` (L511) 修改的是 `this._state.messages`。这样即使运行过程中外部读取 `state.messages` 也不会看到中间状态。
选择入口函数:
const stream = messages? agentLoop(messages, context, config, this.abortController.signal, this.streamFn): agentLoopContinue(context, config, this.abortController.signal, this.streamFn);
有新消息 → `agentLoop`,没有 → `agentLoopContinue`。
-
事件消息循环
forawait (constevent of stream) {switch (event.type) {……}this.emit(event); // 通知所有订阅者}
-
事件订阅
subscribe(fn: (e: AgentEvent) => void): () => void {this.listeners.add(fn);return () => this.listeners.delete(fn); // 返回取消订阅函数}private emit(e: AgentEvent) {for (const listener of this.listeners) {listener(e); // 同步调用所有订阅者}}
使用 `Set` 存储监听器,`subscribe` 返回 unsubscribe 函数,模式与 EventEmitter 一致。事件是**同步分发**的。
总结
有了 agent 的学习,才会懂得大模型是如何工作的,让自然语言如何执行,完成最终的目录。
后面会继续针对 pi-mono 以及 openclaw 源码进行深入学习,更快的掌握 ai 编程技能。

夜雨聆风