乐于分享
好东西不私藏

OpenClaw 源码学习-Quick Start

OpenClaw 源码学习-Quick Start

前一篇OpenClaw 源码学习-Quick Start介绍了 pi-mono 项目的核心计算模块 pi-ai,今天将继续学习 pi-mono 另一个重要模块 pi-agent。

还是老规矩,先从 QuickStart 开始。

Quick Start

import { Agent } from "@mariozechner/pi-agent-core";import { getModel } from "@mariozechner/pi-ai";const agent = new Agent({  initialState: {    systemPrompt"You are a helpful assistant.",    modelgetModel("anthropic""claude-sonnet-4-20250514"),  },});agent.subscribe((event) => {  if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {    // Stream just the new text chunk    process.stdout.write(event.assistantMessageEvent.delta);  }});await agent.prompt("Hello!");

这个模块比较简单,从 quickstart 的逻辑就可以看出,其主要工作是封装了 agent。

agent 是什么,可以理解为带有了记忆、工具的大模型。大模型本省没有记忆和工具能力,agent 正好是对其能力的补充。

types.ts

首先理解数据模型,理解这个包的全部类型定义,这是读懂后续代码的前提。

1. StreamFn

export type StreamFn = (	...argsParameters<typeof streamSimple>) => ReturnType<typeof streamSimple> | Promise<ReturnType<typeof streamSimple>>;

流式调用函数:可同步返回,也可返回 Promise,这是 LLM 调用的抽象签名。

这里用 `Parameters` 和 `ReturnType` 两个 TypeScript 内置工具类型分别提取 `streamSimple` 的**参数元组**和**返回值类型**。

拆解一下:

// streamSimple 的签名大致是:function streamSimple(  model: Model<any>,  context: Context,  options: SimpleStreamOptions): EventStream<AssistantMessageEventAssistantMessage>// 所以:Parameters<typeof streamSimple>   // → [Model<any>, Context, SimpleStreamOptions]ReturnType<typeof streamSimple>   // → EventStream<AssistantMessageEvent, AssistantMessage>

最终展开为:

export type StreamFn = (  modelModel<any>,  contextContext,  optionsSimpleStreamOptions) => EventStream<AssistantMessageEventAssistantMessage>   | Promise<EventStream<AssistantMessageEventAssistantMessage>>;

这样做的好处是不用手动同步签名——如果 streamSimple 将来加了参数或改了返回类型,StreamFn 自动跟着变。

2. AgentLoopConfig:agent-loop 的运行配置。

  • model:LLM 大模型

  • convertToLlm : 在每次 LLM 调用之前,将 `AgentMessage[]` 转换为底层提供商可理解的 `Message[]`。每条 `AgentMessage` 都需要被映射为 LLM 可理解的 `UserMessage` / `AssistantMessage` / `ToolResultMessage` 之一。无法转换的消息(例如仅用于 UI 的通知、状态消息等)应该在此处被过滤掉。

  • transformContext:在 `convertToLlm` 之前对上下文做一次可选转换。适合做“以 AgentMessage 为单位”的操作,例如:上下文窗口管理(裁剪旧消息)或者注入来自外部的数据/上下文。

  • getApiKey:为指定 provider 动态解析 API key(每次 LLM 调用都会触发)。适合短期 OAuth token(例如 GitHub Copilot)可能在长时间工具执行期间过期的场景。

  • getSteeringMessages:返回需要在运行过程中注入的 steering 消息。loop 会在每次工具执行之后调用它,用于检查用户是否需要“插话/打断”。如果返回了消息,剩余工具调用会被跳过,并在下一次 LLM 调用前把这些消息加入上下文。

  • getFollowUpMessages:返回需要在 agent 本应结束时继续处理的 follow-up 消息。当 agent 没有更多工具调用且没有 steering 消息时会调用。如果返回了消息,会将其加入上下文并继续下一轮对话。

  • CustomAgentMessages:可扩展的自定义消息接口。

export interface CustomAgentMessages {}export type AgentMessage = Message | CustomAgentMessages[keyof CustomAgentMessages];

`AgentMessage`:LLM 消息 + 自定义消息的联合类型。该抽象允许应用添加自定义消息类型,同时保持类型安全并兼容基础 LLM 消息。

3. AgentState:Agent 的状态:包含配置与对话数据,一个非常重要的数据结构

export interface AgentState {	systemPromptstring;	modelModel<any>;	thinkingLevelThinkingLevel;	toolsAgentTool<any>[];	messagesAgentMessage[];	isStreamingboolean;	streamMessageAgentMessage | null;	pendingToolCallsSet<string>;	error?: string;}

4. AgentTool:AgentTool 在 Tool 基础上增加 execute 能力

export interface AgentTool<TParameters extends TSchema = TSchemaTDetails = anyextends Tool<TParameters> {	labelstring// 用于 UI 展示的工具名称/标签 	execute(		toolCallIdstring,		paramsStatic<TParameters>,		signal?: AbortSignal,		onUpdate?: AgentToolUpdateCallback<TDetails>,	) => Promise<AgentToolResult<TDetails>>;}

5. AgentContext: AgentContext 类似于 pi-ai 的 Context,但 tools 使用 AgentTool

export interface AgentContext {	systemPromptstring;	messagesAgentMessage[];	tools?: AgentTool<any>[];}

6. AgentEvent:Agent 对外发出的事件(常用于 UI 更新)。这些事件提供了消息、轮次以及工具执行的细粒度生命周期信息。四种生命周期的事件联合类型,共 11 种事件。

export type AgentEvent =	// Agent lifecycle	| { type"agent_start" }	| { type"agent_end"messagesAgentMessage[] }	// Turn lifecycle - a turn is one assistant response + any tool calls/results	| { type"turn_start" }	| { type"turn_end"messageAgentMessagetoolResultsToolResultMessage[] }	// Message lifecycle - emitted for user, assistant, and toolResult messages	| { type"message_start"messageAgentMessage }	// Only emitted for assistant messages during streaming	| { type"message_update"messageAgentMessageassistantMessageEventAssistantMessageEvent }	| { type"message_end"messageAgentMessage }	// Tool execution lifecycle	| { type"tool_execution_start"toolCallIdstringtoolNamestringargsany }	| { type"tool_execution_update"toolCallIdstringtoolNamestringargsanypartialResultany }	| { type"tool_execution_end"toolCallIdstringtoolNamestringresultanyisErrorboolean };

agent-loop.ts

1. `agentLoop` 和 `agentLoopContinue`:

export function agentLoop(	prompts: AgentMessage[],	context: AgentContext,	config: AgentLoopConfig,	signal?: AbortSignal,	streamFn?: StreamFn,): EventStream<AgentEventAgentMessage[]>

export function agentLoopContinue(	context: AgentContext,	config: AgentLoopConfig,	signal?: AbortSignal,	streamFn?: StreamFn,): EventStream<AgentEventAgentMessage[]>

两者的区别在于是否插入新消息。

可以理解为前者是 LLM 发起一个新对话,后者是 LLM 调用失败,针对上线问重试,二者共享`runLoop`。

2. createAgentStream:建了一个以 `agent_end` 为终止信号的异步事件流,终止时返回本轮产生的所有新消息。

`EventStream<T, R>` 是一个通用的异步事件流,两个泛型参数:

  • `T` — 事件类型(这里就是 `AgentEvent`)

  •  R` — 最终结果类型(这里就是 `AgentMessage[]`)

关于 EventStream 是如何运行的:

  • push() 时发生了什么?当 `runLoop` 中调用 `stream.push(event)` 时:

    • 检查 `isComplete(event)` — 如果是 `agent_end`,标记 `done = true`,将 `event.messages` 存入 `finalResultPromise`

    • 有消费者在等(`waiting` 数组非空)→ 直接交付

    • 没人等 → 放入 `queue` 缓冲

  • async *[Symbol.asyncIterator](): AsyncIterator<T> 是如何运行的?在消费端调用如下:

forawait (constevent of stream) { ... }

  `for await` 调用 `[Symbol.asyncIterator]()`:

    • `queue` 有数据 → `yield` 出队

    • `done` → `return`,迭代结束

    • 都没有 → 创建 Promise 挂起等待,直到 `push()` 或 `end()` 唤醒

  • result: 可以在迭代之前或之后调用,`await stream.result()` 拿到 `agent_end` 时的 `messages`。`agent.ts` 中没直接用这个方法,但 `streamAssistantResponse()` 中用了 LLM 层的 `response.result()`。

3. runLoop: 是整个 agent 的计算核心,用双层循环实现了 **LLM 响应 → 工具执行 → 检查中断/追加 → 继续或结束** 的完整生命周期。

while(true) {                    // 外层:follow-up 循环     while(hasMoreToolCalls || pendingMessages.length > 0) {  // 内层:工具+steering 循环       ① 注入 pendingMessages,这些消息可能是 steering 打断消息,也可能是 follow-up 追加消息。注入后清空。       ② streamAssistantResponse()  ← 调 LLM,这里会做 transformContext → convertToLlm → 调用 LLM → 流式输出事件。       ③ 检查 stopReason → error/aborted 则退出,LLM 出错或被 abort,立即结束,不执行工具。       ④ 有工具调用?→ executeToolCalls(),逐个执行工具,每个执行后检查 steering。如果有 steering,剩余工具被跳过,返回 steeringMessages。工具结果会加入到上下文       ⑤ turn_end       ⑥ 检查 steering → 设为 pendingMessages,下一轮内层循环开头注入。     }     检查 follow-up → 有则设为 pendingMessages,continue     无 follow-up → break   }   agent_end

   内存循环:没有工具调用且没有待注入消息就结束。

   外层循环:每次 agent “本来要停了” 的时候,再检查一次 follow-up 队列。有消息就继续干,没有就结束。

agent.ts

   主要是对 agent-loop 的封装,增加了状态管理、队列和事件订阅。

   1. defaultConvertToLlm, 默认的消息过滤,只保留`user` `assistant` 和 `toolResult` 消息。

function defaultConvertToLlm(messages: AgentMessage[]): Message[] {   	return messages.filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult");   }

   2. AgentOptions

      对比 `AgentLoopConfig`,增加了:

    • `initialState`: 初始状态(浅合并)

    • `steeringMode` / `followUpMode`: 队列出队模式

    • `streamFn`: 自定义流函数

    • `sessionId`、`getApiKey`、`thinkingBudgets`、`transport`、`maxRetryDelayMs`

   3. Agent类是 `agent-loop` 的**有状态封装**,面向应用层使用。它把无状态的函数式 API 包装成易用的面向对象接口,管理状态、事件订阅和消息队列。

    • 状态:

private _stateAgentState = {        systemPrompt"",        modelgetModel("google""gemini-2.5-flash-lite-preview-06-17"),  // 默认模型        thinkingLevel"off",        tools: [],        messages: [],        isStreamingfalse,        streamMessagenull,        // 流式过程中的部分消息,供 UI 实时读取        pendingToolCallsnew Set(), // 正在执行的工具 ID,供 UI 显示进度        errorundefined,      };
    • 队列:

private steeringQueue: AgentMessage[] = [];   // 打断消息队列private followUpQueue: AgentMessage[] = [];   // 追加任务队列private steeringMode: "all" | "one-at-a-time";private followUpMode: "all" | "one-at-a-time";
    • 运行时控制:

private abortController?: AbortController;     // 中断当前请求private runningPrompt?: Promise<void>;          // 当前是否在运行private resolveRunningPrompt?: () => void;      // resolve runningPrompt 的引用
    • 配置:

private convertToLlm: ...;        // 消息转换函数private transformContext?: ...;    // 上下文转换函数public streamFnStreamFn;        // LLM 调用函数(public,允许外部替换)private _sessionId?: string;      // 会话 IDpublic getApiKey?: ...;           // 动态 API key(public,方便外部访问)private _onPayload?: ...;         // 请求拦截private _thinkingBudgets?: ...;   // 思考预算private _transportTransport;    // 传输方式private _maxRetryDelayMs?: number;// 最大重试等待时间
    • 构造函数:

constructor(opts: AgentOptions = {}) {        this._state = { ...this._state, ...opts.initialState };  // 浅合并初始状态        this.convertToLlm = opts.convertToLlm || defaultConvertToLlm;        this.transformContext = opts.transformContext;        this.steeringMode = opts.steeringMode || "one-at-a-time";   // 默认逐条出队        this.followUpMode = opts.followUpMode || "one-at-a-time";        this.streamFn = opts.streamFn || streamSimple;               // 默认直连 LLM        this._transport = opts.transport ?? "sse";                   // 默认 SSE        ...      }

      其中关键点:

    • `initialState` 是**浅合并**,只覆盖提供的字段

    • `convertToLlm` 默认只保留 `user`/`assistant`/`toolResult` 三种角色

    • `streamFn` 默认 `streamSimple`,浏览器场景替换为 `streamProxy`

    • 状态的修改方法

全是检点的赋值操作,主要有配置修改、消息管理和队列操作。

两个`dequeue` 方法的逻辑相同,只是操作的不同的队列。

dequeueSteeringMessages() {        if (mode === "one-at-a-time") {          // 取第一个,其余保留          const first = this.steeringQueue[0];          this.steeringQueue = this.steeringQueue.slice(1);          return [first];        }        // "all" 模式:全部取出        const steering = this.steeringQueue.slice();        this.steeringQueue = [];        return steering;      }
    • prompt 发起对话,流程如下:

prompt("读一下 config.json")      │      ├─ 检查 isStreaming → 抛错(冲突保护)      ├─ 检查 model → 抛错      │      ├─ 构造 AgentMessage:      │   ├─ string → { role: "user", content: [{ type: "text", text: input }], timestamp }      │   ├─ AgentMessage → [input]      │   └─ AgentMessage[] → input      │      └─ _runLoop(msgs)

      参数类型是一个联合型的,对于字符串输入会被包装为标准 user 消息,`images` 会追加到 `content` 数组中(与 text 并列)。其他类型是 AgentMessage 。

    • continue 继续对话,其流程如下:

continue()      │      ├─ 检查 isStreaming → 抛错      ├─ 检查 messages 为空 → 抛错      │      ├─ 末尾消息 role === "assistant"?      │   ├─ 有 steering 消息?      │   │   └─ _runLoop(steering, { skipInitialSteeringPolltrue })      │   │       ↑ skipInitialSteeringPoll 避免 runLoop 入口重复消费 steering      │   ├─ 有 followUp 消息?      │   │   └─ _runLoop(followUp)      │   └─ 都没有 → 抛错 "Cannot continue from message role: assistant"      │      └─ 末尾是 user 或 toolResult          └─ _runLoop(undefined)  ← 不传入新消息,直接继续
    • _runLoop 核心运行方法,这是 Agent 类最关键的方法,**桥接 Agent 状态和 agentLoop 函数**。

      初始化阶段:

      • 创建 running promise(用于 waitForIdle)

      • 运行时状态

      • 快照当前状态(防止运行期间状态被外部修改影响)

      • 构建配置

      • 特殊注意点:`messages` 用 `.slice()` 做了拷贝。agentLoop 操作的是 `currentContext`(拷贝),但 `message_end` 事件触发 `appendMessage()` (L511) 修改的是 `this._state.messages`。这样即使运行过程中外部读取 `state.messages` 也不会看到中间状态。

      选择入口函数:

const stream = messages        ? agentLoop(messages, context, config, this.abortController.signal, this.streamFn)        : agentLoopContinue(context, config, this.abortController.signal, this.streamFn);

      有新消息 → `agentLoop`,没有 → `agentLoopContinue`。

    • 事件消息循环

forawait (constevent of stream) {        switch (event.type) {          ……        }        this.emit(event);  // 通知所有订阅者      }
    • 事件订阅

subscribe(fn(eAgentEvent) => void): () => void {        this.listeners.add(fn);        return () => this.listeners.delete(fn);  // 返回取消订阅函数      }      private emit(eAgentEvent) {        for (const listener of this.listeners) {          listener(e);  // 同步调用所有订阅者        }      }

      使用 `Set` 存储监听器,`subscribe` 返回 unsubscribe 函数,模式与 EventEmitter 一致。事件是**同步分发**的。

总结

有了 agent 的学习,才会懂得大模型是如何工作的,让自然语言如何执行,完成最终的目录。

后面会继续针对 pi-mono 以及 openclaw 源码进行深入学习,更快的掌握 ai 编程技能。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » OpenClaw 源码学习-Quick Start

猜你喜欢

  • 暂无文章