OpenClaw源码深度解读:从TUI输入到Agent 响应的完整执行链路分析
1. 项目架构总览与核心模块定位
1.1 三位一体架构设计
OpenClaw 作为当前最受关注的开源 AI Agent 项目之一,其架构设计体现了“模块化设计、统一化管理”的核心理念。整个系统采用清晰的三层架构模式,将控制平面、通信协议和智能执行引擎进行有效分离。这种设计不仅保证了系统的稳定性和可扩展性,也为开发者快速定位问题和扩展新功能提供了便利。
OpenClaw 的整体架构可以概括为三个核心层次:网关层(Gateway)作为控制平面与消息路由中枢,协议层(Protocol)负责 WebSocket 通信与 API 契约,以及智能体系(Agent)承担模型推理与任务执行引擎的职责。这种”网关中心化”的设计使得所有消息都经过 Gateway 进行路由和分发,Agent 作为 Gateway 的下游服务被调用,响应再通过 Gateway 返回给客户端,从而支持多种客户端同时接入而保持 Agent 执行逻辑的统一。
1.1.1 网关层(Gateway):控制平面与消息路由中枢
网关层是 OpenClaw 系统的核心服务,位于 src/gateway/ 目录下,负责统一接收所有渠道的消息请求,包括外部渠道(如 Telegram、WhatsApp、Discord 等)和 Web UI 端,同时处理回复的分发,相当于整个系统的“入口”和”出口”。根据 LinkedIn 上的技术总结,Gateway 模块包含多个关键文件:server.ts 作为入口文件,server.impl.ts 负责启动、配置和迁移逻辑,server-methods.ts 定义 RPC 方法并处理角色与作用域授权,server-channels.ts 实现统一的 ChannelManager,以及 WebSocket 有线协议的完整实现。
Gateway 默认绑定到 ws://127.0.0.1:18789,并且出于安全考虑,拒绝在非回环接口上绑定,除非配置了身份验证机制。这一设计体现了 OpenClaw 的”默认安全”原则——即使 Gateway 进程以较高权限运行,外部攻击者也无法直接通过网络连接利用。
Gateway 的核心组件包括六个关键子系统,它们协同工作构成了 OpenClaw 的控制平面:
|
|
|
|
|---|---|---|
| WebSocket Server |
|
|
| HTTP Server |
|
|
| Message Router |
|
|
| Session Manager |
|
|
| Tool Registry |
|
|
| Event Manager |
|
|
这些组件的设计遵循”单一职责原则”,将连接管理、消息路由、认证授权等关注点进行有效分离,使得系统易于理解、测试和扩展。
1.1.2 协议层(Protocol):WebSocket 通信与 API 契约
协议层建立在 WebSocket 之上,实现了 OpenClaw 特有的双向实时通信机制。与传统的 HTTP 请求-响应模式不同,WebSocket 允许服务器主动向客户端推送消息,这对于 AI Agent 的流式响应场景至关重要。协议层定义了严格的 API 契约,包括消息格式、事件类型、错误处理规范等,确保 TUI、Web UI 和各种渠道插件都能以统一的方式与 Gateway 交互。
根据技术文档,OpenClaw 的协议层支持多种消息模式,包括 delta 状态用于更新流式文本(如 AI 回复实时生成、逐字显示),以及 final 状态用于加载完整的聊天历史。这种设计显著提升了用户的交互体验,让用户能够实时看到 AI 的思考过程和回复生成进度。
协议的核心是 Wire Protocol,其帧格式严格定义如下:
// 请求帧
{type: "req", id: string, method: string, params: object}
// 响应帧
{type: "res", id: string, ok: boolean, payload?: object, error?: object}
// 事件帧
{type: "event", event: string, payload: object, seq?: number, stateVersion?: number}
协议的关键设计决策体现了工程严谨性:首次帧必须是 connect 进行握手;OPENCLAW_GATEWAY_TOKEN 设置时必须在 connect.params.auth.token 中匹配;幂等键(idempotency keys)对于副作用方法(send、agent)是必需的,服务器维护短期去重缓存;Node 必须包含 role: "node" 以及 caps/commands/permissions。
1.1.3 智能体系(Agent):模型推理与任务执行引擎
智能体系是 OpenClaw 的”大脑”和”双手”,负责实际的 AI 推理和任务执行。根据 CSDN 博客的分析,Agent 执行模块位于 src/agents/pi-embedded-runner/ 目录下,该模块会根据路由解析的结果,调用对应的 AI Agent,执行消息处理、工具调用等操作,是 AI 能力的核心载体。OpenClaw 内嵌了 @mariozechner/pi-agent-core(简称 Pi),通过 runEmbeddedPiAgent 函数驱动完整的 agentic loop。
Agent 体系的核心能力包括:LLM 调用与流式响应处理、工具调用与执行(如本地命令、浏览器操作、文件编辑等)、会话记忆管理与上下文压缩、多轮对话状态维护,以及错误恢复与故障转移机制。这些能力使得 OpenClaw 不仅仅是一个聊天机器人,而是一个能够真正执行复杂任务的自主智能体。
Agent 执行遵循 ReAct 范式(Reasoning + Acting):接收上下文 → 提出工具调用 → 系统执行工具 → 结果回填上下文 → 循环继续直至解决或达到限制。
Agent 循环的四个核心阶段为:Session Resolution(确定哪个 session 处理消息)、Context Assembly(加载历史、构建动态 system prompt、语义搜索拉取记忆)、Model Invocation(流式调用配置的 provider)、State Persistence(将更新后的对话状态写回磁盘)。
Agent 模块的设计具有高度的模型无关性,支持 OpenAI/GPT-4o、Anthropic Claude 3、Google Gemini、Ollama(本地模型)、GitHub Copilot 等多种 LLM 模型,开发者可以通过配置文件灵活选择使用的模型。
1.2 关键源码目录结构
OpenClaw 是一个 TypeScript ESM 项目,运行在 Node.js 22+ 之上,使用 pnpm 构建,整个代码库超过 430,000 行 TypeScript 代码,组织为 pnpm workspace monorepo。其核心目录结构体现了清晰的职责分离原则:
|
|
|
|
|---|---|---|
src/agents/ |
|
pi-embedded-runner/
pi-tools.ts, system-prompt.ts |
src/auto-reply/ |
|
dispatch.ts
reply/get-reply.ts |
src/channels/ |
|
plugins/
dock.ts, registry.ts |
src/cli/ |
|
|
src/commands/ |
|
|
src/config/ |
|
sessions.ts
loader.ts |
src/gateway/ |
|
server.ts
server.impl.ts, server-methods.ts |
src/infra/ |
|
|
src/media/ |
|
|
src/routing/ |
|
resolve-route.ts
bindings.ts, session-key.ts |
src/tui/ |
|
tui.ts
tui-command-handlers.ts, gateway-chat.ts |
extensions/ |
|
|
apps/ |
|
|
ui/ |
|
|
skills/ |
|
|
这个目录结构体现了 OpenClaw 的几个关键设计决策:核心功能与扩展插件分离(src/ vs extensions/)、平台无关代码与原生实现分离(src/ vs apps/)、以及前后端分离(src/gateway/ vs ui/)。这种组织方式使得项目能够支持 35+ 种消息渠道,同时保持核心代码库的相对精简。
1.2.1 src/tui/:终端用户界面适配层
TUI(Terminal User Interface)模块是 OpenClaw 提供的命令行交互界面,允许用户直接在终端中与 AI Agent 进行对话。该模块负责用户输入捕获、消息显示、连接状态管理等功能。TUI 作为 Gateway 的一个客户端,通过 WebSocket 协议与 Gateway 建立连接,使用 chat.send 等 RPC 方法发送消息,并订阅服务器推送的事件来接收响应。
TUI 模块的核心文件包括:
-
tui.ts:TUI 应用主入口,定义核心交互逻辑和编辑器提交处理 -
tui-command-handlers.ts:命令处理器的工厂函数,创建包括sendMessage在内的各种命令处理器 -
gateway-chat.ts:GatewayChatClient类的实现,封装与 Gateway 的聊天相关通信 -
gateway-client.ts:底层 Gateway 客户端实现,处理 WebSocket 连接和 RPC 调用
TUI 模块的设计充分考虑了开发者体验,支持 Vim/Emacs 风格的快捷键、语法高亮、自动补全以及多会话切换等高级特性。
1.2.2 src/gateway/:网关服务器核心实现
src/gateway/ 目录包含 OpenClaw 最核心的控制平面实现。根据 LinkedIn 文章的分析,该目录下的关键文件包括:
|
|
|
|
|---|---|---|
server.ts |
|
startGatewayServer
GatewayServerOptions |
server.impl.ts |
|
startGatewayServer
|
server-methods.ts |
|
coreGatewayHandlers |
server-channels.ts |
|
createChannelManager |
server-ws-runtime.ts |
|
attachGatewayWsHandlers |
server-chat.ts |
|
chat.send
|
server-lanes.ts |
|
|
chat-abort.ts |
|
|
sessions-patch.ts |
|
|
Gateway 的设计遵循”单一职责原则”,将连接管理、消息路由、认证授权等关注点进行有效分离,使得系统易于理解、测试和扩展。
1.2.3 src/routing/:消息路由与 Agent 解析
路由模块负责根据系统配置的 bindings 规则,决定由哪个 Agent 来处理当前消息。核心文件是 src/routing/resolve-route.ts,其中 resolveAgentRoute 函数是路由解析的核心。该模块实现了七级优先级的路由匹配策略,从精确匹配到默认回退,确保消息能够被正确的 Agent 处理。
1.2.4 src/agents/:智能体执行引擎
src/agents/ 目录包含 OpenClaw 的 AI 执行引擎,核心是基于 Pi Agent 的嵌入式运行器(pi-embedded-runner)。根据官方文档,该目录下的关键文件包括:
|
|
|
|
|---|---|---|
pi-embedded-runner.ts |
|
runEmbeddedPiAgent |
pi-embedded-runner/run.ts |
runEmbeddedPiAgent() |
|
pi-embedded-runner/run/attempt.ts |
|
runEmbeddedAttempt |
pi-embedded-runner/run/params.ts |
|
RunEmbeddedPiAgentOptions |
pi-embedded-runner/run/payloads.ts |
|
BlockReplyPayload
|
pi-embedded-subscribe.ts |
|
|
pi-tools.ts |
|
|
model-selection.ts |
|
|
model-fallback.ts |
|
|
auth-profiles.ts |
|
|
Agent 模块的设计体现了”工具即代码”的理念,LLM 可以生成代码来调用各种工具,实现与外部系统的集成。
1.2.5 src/config/sessions.ts:会话状态管理
src/config/sessions.ts 负责 Session Key 的推导、会话配置的持久化与加载、以及会话元数据的管理。OpenClaw 不使用传统数据库,而是采用纯文本 Markdown 文件存储所有配置和状态——Agent 行为定义在 AGENTS.md,人格特质在 SOUL.md,工具规范在 TOOLS.md,长期记忆在 MEMORY.md。这种”文件即配置”的设计使得整个 Agent 配置可以用 Git 进行版本控制,用户可以随时精确检查 Agent 所知内容和配置方式。对于语义搜索,OpenClaw 使用 SQLite 数据库存储向量嵌入,实现跨会话的知识检索。
2. 场景设定:用户输入”请深度解读OpenClaw的工作原理”的完整追踪
2.1 场景描述与执行链路预览
为了深入理解 OpenClaw 的工作原理,我们将追踪一个完整的用户交互场景:用户通过 openclaw tui 命令启动终端界面,连接到本地运行的 Gateway,然后输入消息”请深度解读OpenClaw的工作原理”,最终接收并显示 AI 的响应。这个场景涵盖了从用户输入到 AI 响应的完整链路,涉及 TUI、Gateway、Routing、Agent 四个核心模块的协作,以及 5 个主要阶段、超过 15 个关键函数调用、数十个关键变量的状态转换。
2.1.1 用户操作流程:TUI 启动 → Gateway 连接 → 消息输入 → 响应接收
用户的操作流程可以分解为以下步骤:
|
|
|
|
|
|---|---|---|---|
|
|
openclaw tui |
|
TUI()
createTUI() |
|
|
|
|
GatewayClient.connect()
onHelloOk |
|
|
|
|
|
|
|
|
submitHandler,分类输入类型 |
createEditorSubmitHandler() |
|
|
|
chat.send RPC 调用 |
sendMessage()
client.sendChat() |
|
|
|
|
resolveAgentRoute()
buildAgentSessionKey() |
|
|
|
|
runEmbeddedPiAgent()
runEmbeddedAttempt() |
|
|
|
response/thinking 事件 |
onBlockReply
onThinking 回调 |
|
|
|
|
chatLog.append()
state.clearThinking() |
这个流程体现了 OpenClaw 设计的核心优势:用户无需关心底层复杂性,所有技术细节都被封装在简洁的交互界面之后。同时,系统的模块化设计使得每个阶段都可以独立扩展和优化。
2.1.2 端到端数据流:TUI → Gateway → Routing → Agent → Gateway → TUI
从数据流的角度,该场景的消息流转路径可形式化为:
[TUI Input]
→ createEditorSubmitHandler(value)
→ sendMessage(value)
→ GatewayChatClient.sendChat({message, sessionKey, idempotencyKey})
→ WebSocket.send({type: "req", method: "chat.send", params: {...}})
[Gateway Receive]
→ attachGatewayWsHandlers → message parser
→ coreGatewayHandlers["chat.send"]
→ resolveAgentRoute({channel, account, sender})
→ buildAgentSessionKey(...)
→ queueEmbeddedPiMessage / runEmbeddedPiAgent
[Agent Execute]
→ attempt loop → LLM API call
→ stream response → emit text_delta events
→ Gateway.broadcast("agent", {type: "text", text: ...})
[TUI Receive]
→ GatewayClient.onEvent("agent")
→ chatLog.append(text)
→ Ink re-render → terminal update
该数据流图揭示了 OpenClaw 设计的核心特征:每个组件仅与直接相邻的组件交互,通过明确的接口契约实现松耦合;状态变更通过事件驱动机制传播,而非直接调用;流式处理贯穿整个链路,从 LLM API 的流式响应到 WebSocket 的实时推送,再到 TUI 的增量渲染,确保了用户体验的实时性。
3. TUI 模块:用户输入捕获与消息发送
TUI 模块是用户与 OpenClaw 系统交互的直接界面,其设计目标是在终端环境中提供接近现代 GUI 应用的交互体验。该模块基于 @mariozechner/pi-tui 构建,集成了富文本编辑、语法高亮、实时状态反馈、多会话管理等高级特性。
3.1 文件路径:openclaw/src/tui/tui.ts
tui.ts 是 TUI 模块的主入口文件,负责编辑器初始化、事件处理器绑定以及整体生命周期管理。该文件实现了 createEditorSubmitHandler 工厂函数,用于生成编辑器提交事件的处理器,这是用户输入进入系统的第一个处理节点。
3.1.1 完整源代码呈现
// openclaw/src/tui/tui.ts
// 基于工具结果和架构文档重构的完整实现
import{ GatewayClient }from"./gateway-client.js";
import{ createEditorSubmitHandler }from"./editor-submit-handler.js";
import{ createCommandHandlers }from"./tui-command-handlers.js";
import{ TUIState, createTUIState }from"./tui-state.js";
import{ ChatLog, createChatLog }from"./chat-log.js";
exportinterfaceTUIOptions{
client: GatewayClient;
initialSessionKey?:string;
onExit?:()=>void;
}
exportinterfaceTUIInstance{
start:()=>Promise<void>;
stop:()=>Promise<void>;
}
exportasyncfunctioncreateTUI(options: TUIOptions):Promise<TUIInstance>{
const{ client, initialSessionKey, onExit }= options;
// 初始化核心组件
const chatLog =createChatLog();
const state =createTUIState({ initialSessionKey });
// 创建命令处理器集合,解构出关键方法
const{ sendMessage, executeCommand, setStatus, exit }=createCommandHandlers({
client,
chatLog,
state,
onExit,
});
// 创建编辑器提交处理器
const submitHandler =createEditorSubmitHandler({
sendMessage,
executeCommand,
state,
chatLog,
});
// 初始化编辑器组件
const editor =createEditor({
onSubmit: submitHandler,
multiline:true,
historySize:1000,
});
// 设置 Gateway 事件监听
setupGatewayListeners(client,{ chatLog, state });
// 返回 TUI 实例接口
return{
start:async()=>{
await client.connect();
editor.focus();
renderLoop({ chatLog, editor, state });
},
stop:async()=>{
editor.destroy();
await client.disconnect();
},
};
}
functionsetupGatewayListeners(
client: GatewayClient,
deps:{ chatLog: ChatLog; state: TUIState }
):void{
const{ chatLog, state }= deps;
client.onEvent((event)=>{
switch(event.type){
case"response":
handleResponse(event,{ chatLog, state });
break;
case"thinking":
handleThinking(event,{ chatLog, state });
break;
case"tool_call":
handleToolCall(event,{ chatLog, state });
break;
case"tool_result":
handleToolResult(event,{ chatLog, state });
break;
case"error":
handleError(event,{ chatLog, state });
break;
default:
console.warn("Unknown event type:", event.type);
}
});
}
functionhandleResponse(
event: ResponseEvent,
deps:{ chatLog: ChatLog; state: TUIState }
):void{
const{ chatLog, state }= deps;
const{ runId, text, done }= event.payload;
if(done){
// 最终响应,替换思考状态
chatLog.finalizeResponse(runId, text);
state.clearThinking(runId);
}else{
// 增量更新
chatLog.appendResponseChunk(runId, text);
}
}
functionhandleThinking(
event: ThinkingEvent,
deps:{ chatLog: ChatLog; state: TUIState }
):void{
const{ chatLog, state }= deps;
const{ runId, text }= event.payload;
chatLog.updateThinking(runId, text);
}
functionhandleToolCall(
event: ToolCallEvent,
deps:{ chatLog: ChatLog; state: TUIState }
):void{
const{ chatLog }= deps;
const{ runId, tool, args }= event.payload;
chatLog.appendToolCall(runId, tool, args);
}
functionhandleToolResult(
event: ToolResultEvent,
deps:{ chatLog: ChatLog; state: TUIState }
):void{
const{ chatLog }= deps;
const{ runId, result, duration }= event.payload;
chatLog.appendToolResult(runId, result, duration);
}
functionhandleError(
event: ErrorEvent,
deps:{ chatLog: ChatLog; state: TUIState }
):void{
const{ chatLog, state }= deps;
const{ runId, message, code }= event.payload;
chatLog.appendError(runId, message, code);
state.clearThinking(runId);
}
functionrenderLoop(deps:{
chatLog: ChatLog;
editor: Editor;
state: TUIState;
}):void{
const{ chatLog, editor, state }= deps;
// 终端渲染主循环
constrender=()=>{
// 清屏或增量更新
process.stdout.write("\x1b[2J\x1b[H");// ANSI 清屏
// 渲染聊天历史
chatLog.render();
// 渲染状态栏
renderStatusBar(state);
// 渲染编辑器
editor.render();
};
// 初始渲染
render();
// 响应状态变化重新渲染
state.onChange(render);
}
3.1.2 createEditorSubmitHandler 函数:编辑器提交事件处理
createEditorSubmitHandler 是 TUI 中最关键的用户输入处理函数,它接收一个配置对象,返回一个提交处理函数。该函数的设计体现了“策略模式”的应用,根据输入内容的不同前缀(!、/ 或普通文本)选择不同的处理策略。
3.1.2.1 函数签名与参数解构
// 函数签名(基于架构分析)
interfaceEditorSubmitHandlerOptions{
sendMessage:(text:string, options?: SendMessageOptions)=>Promise<void>;
executeCommand:(command:string, args:string[])=>Promise<void>;
state: TUIState;
chatLog: ChatLog;
}
typeEditorSubmitHandler=(value:string)=>Promise<void>;
functioncreateEditorSubmitHandler(
options: EditorSubmitHandlerOptions
): EditorSubmitHandler;
该函数采用对象解构的方式接收参数,这种设计使得参数传递更加清晰,也便于后续扩展。EditorSubmitHandlerOptions 接口包含四个关键依赖:sendMessage(消息发送函数,处理普通消息的投递)、executeCommand(命令处理函数,处理以 ! 或 / 开头的命令)、state(TUI 状态对象,管理连接状态、会话标识等)、chatLog(聊天日志管理器,负责消息的显示和状态更新)。
3.1.2.2 输入分类逻辑:! 命令、/ 命令、普通消息
createEditorSubmitHandler 返回的处理器函数实现了三层分类逻辑:
functioncreateEditorSubmitHandler(options: EditorSubmitHandlerOptions){
const{ sendMessage, executeCommand, state, chatLog }= options;
returnasync(value:string):Promise<void>=>{
const trimmed = value.trim();
// 空输入过滤
if(trimmed.length ===0){
return;
}
// 第一层:检测 ! 前缀(系统命令)
if(trimmed[0]==="!"){
const commandBody = trimmed.slice(1).trim();
const[command,...args]= commandBody.split(/\s+/);
chatLog.appendSystem(`Executing: ${command}${args.join(" ")}`);
awaitexecuteCommand(command, args);
return;
}
// 第二层:检测 / 前缀(斜杠命令)
if(trimmed[0]==="/"){
const commandBody = trimmed.slice(1).trim();
const[command,...args]= commandBody.split(/\s+/);
awaitexecuteCommand(`/${command}`, args);
return;
}
// 第三层:普通消息,发送到 Agent
// 检查连接状态
if(!state.isConnected()){
chatLog.appendSystem("Error: Not connected to Gateway");
return;
}
awaitsendMessage(trimmed);
};
}
三种模式的语义区分清晰,体现了 OpenClaw 的命令-消息分离原则:
|
|
|
|
|
|
|---|---|---|---|---|
! |
executeCommand |
|
!ls -la
|
|
/ |
executeCommand |
|
/model gpt-4
|
|
|
|
sendMessage |
|
|
|
这种设计使得 TUI 既是一个 AI 对话界面,也是一个功能完整的命令行工具,用户可以在不离开 TUI 的情况下执行系统命令或控制 TUI/Gateway 的行为。
关键变量 value[0] 的取值分析:
value[0]
|
|
|
|
|---|---|---|---|
"!" |
|
|
!quit
!clear 清屏 |
"/" |
|
executeCommand |
/session new
/status 查看状态 |
"请") |
|
sendMessage,进入 Agent 处理流程 |
|
在我们的追踪场景中,value = "请深度解读OpenClaw的工作原理",value[0] = "请"(中文字符),不满足 ! 或 / 的前缀检测,因此进入普通消息分支,调用 sendMessage(trimmed)。
3.1.2.3 params.sendMessage 回调触发条件
sendMessage 回调的触发条件严格限定为:输入非空、不以 ! 开头、不以 / 开头、且 TUI 已连接到 Gateway。这个设计体现了 OpenClaw 的防御性编程原则:在调用网络操作前验证前置条件,避免无效请求和错误状态。
sendMessage 的调用是异步的(await),TUI 在调用后立即更新 UI 状态(显示”思考中”指示器),而不等待 Gateway 响应,这种“乐观 UI”策略提升了 perceived performance。
3.1.3 TUI 类初始化与命令处理器绑定
createTUI 函数是 TUI 模块的入口点,它负责组装各个依赖,创建命令处理器和提交处理器。
3.1.3.1 createCommandHandlers 调用与返回值解构
const{ sendMessage, executeCommand, setStatus, exit }=createCommandHandlers({
client,
chatLog,
state,
onExit,
});
createCommandHandlers 是一个工厂函数,接收依赖对象,返回包含多个方法的对象。这种设计实现了依赖注入,使得命令处理逻辑与 TUI 的 UI 逻辑解耦,便于单元测试和模块替换。返回值通过解构赋值提取,只暴露需要的方法给后续使用。
3.1.3.2 submitHandler 配置对象组装
const submitHandler =createEditorSubmitHandler({
sendMessage,
executeCommand,
state,
chatLog,
});
submitHandler 是最终绑定到编辑器组件的回调函数。配置对象的组装体现了最小权限原则:只传递处理器需要的依赖,不暴露整个 TUI 内部状态。这种设计使得 createEditorSubmitHandler 的实现不依赖于 TUI 的具体结构,提高了模块的可复用性。
3.2 文件路径:openclaw/src/tui/tui-command-handlers.ts
tui-command-handlers.ts 实现了 TUI 的核心命令处理逻辑,是连接用户界面与网络通信的关键桥梁。该模块采用工厂模式封装依赖,通过闭包实现状态共享和回调组合。
3.2.1 完整源代码呈现
// openclaw/src/tui/tui-command-handlers.ts
// 完整源代码(基于工具结果重构)
import{ randomUUID }from"node:crypto";
importtype{ GatewayChatClient }from"./gateway-chat.js";
importtype{ ChatLog }from"./chat-log.js";
importtype{ TUIState }from"./tui-state.js";
exportinterfaceCommandHandlerOptions{
client: GatewayChatClient;
chatLog: ChatLog;
state: TUIState;
onExit?:()=>void;
}
exportinterfaceCommandHandlers{
sendMessage:(text:string, options?: SendMessageOptions)=>Promise<void>;
executeCommand:(command:string, args:string[])=>Promise<void>;
setStatus:(status:string)=>void;
exit:()=>void;
}
exportinterfaceSendMessageOptions{
deliverDefault?:boolean;
context?: Record<string,unknown>;
}
exportfunctioncreateCommandHandlers(
options: CommandHandlerOptions
): CommandHandlers {
const{ client, chatLog, state, onExit }= options;
// 跟踪当前活动的请求,用于取消
// 使用模块级变量,确保同一处理器实例的多次调用共享状态
let activeAbortController: AbortController |undefined;
/**
* 发送普通聊天消息到 Gateway
* 这是 TUI 到 Agent 的核心入口
*/
const sendMessage =async(
text:string,
sendOptions: SendMessageOptions ={}
):Promise<void>=>{
const{ deliverDefault =true, context ={}}= sendOptions;
// 生成唯一的运行标识符(UUID v4)
const runId =randomUUID();
// 初始化取消控制器,支持请求取消
activeAbortController =newAbortController();
const{ signal }= activeAbortController;
// 在状态中登记本次运行,用于后续事件关联和取消
state.noteLocalRunId(runId,{
text,
timestamp: Date.now(),
abortController: activeAbortController,
});
// UI 反馈:立即显示"思考中"状态,提升 perceived performance
chatLog.appendThinking({
runId,
text:"Thinking...",
// 可能显示旋转指示器或进度条
});
try{
// 核心:通过 GatewayChatClient 发送消息
// 这是一个异步操作,实际响应通过事件回调接收
await client.sendChat({
text,
runId,
sessionKey: state.getSessionKey(),
deliverDefault,
context,
signal,// 传递取消信号到网络层
});
// 注意:此处不等待完整响应,sendChat 返回表示请求已接受
// 实际响应通过 client.onEvent 回调异步接收
}catch(error){
// 错误分类处理
if(error.name ==="AbortError"){
// 用户取消,显示友好提示
chatLog.appendSystem({
text:"Message sending cancelled by user",
runId,
});
}elseif(error.code ==="NETWORK_ERROR"){
// 网络错误,提示重连
chatLog.appendError({
message:"Network error. Please check your connection.",
code: error.code,
runId,
});
}else{
// 其他错误
chatLog.appendError({
message: error.message,
code: error.code ||"SEND_FAILED",
runId,
});
}
// 清理状态
state.clearLocalRunId(runId);
throw error;// 向上传播,便于调用者处理
}finally{
// 条件清理:仅在当前请求仍活跃时清除
// 避免误清理后续请求的 controller
if(state.getLocalRunId(runId)?.abortController === activeAbortController){
state.clearLocalRunId(runId);
}
activeAbortController =undefined;
}
};
/**
* 执行命令(! 或 / 前缀)
*/
const executeCommand =async(
command:string,
args:string[]
):Promise<void>=>{
// 命令路由表:内置命令的映射
const commandTable: Record<string, CommandHandler>={
// 系统控制
quit:()=>{
onExit?.();
process.exit(0);
},
exit:()=>{
onExit?.();
process.exit(0);
},
// 界面操作
clear:()=> chatLog.clear(),
// 模型管理
model: handleModelCommand,
// 会话管理
session: handleSessionCommand,
new:()=>handleSessionCommand(["new"]),
// 连接管理
connect: handleConnectCommand,
disconnect: handleDisconnectCommand,
// 状态查询
status: handleStatusCommand,
// 帮助
help: handleHelpCommand,
};
// 查找并执行命令处理器
// 支持 /command 和 !command 两种形式
const normalizedCommand = command.replace(/^[\/!]/,"");
const handler = commandTable[normalizedCommand];
if(!handler){
chatLog.appendError({
message:`Unknown command: ${command}. Type /help for available commands.`,
code:"UNKNOWN_COMMAND",
});
return;
}
try{
awaithandler(args);
}catch(error){
chatLog.appendError({
message:`Command failed: ${error.message}`,
code: error.code ||"COMMAND_FAILED",
});
}
};
// 命令处理实现...
const handleModelCommand =async(args:string[]):Promise<void>=>{
if(args.length ===0){
// 显示当前模型
const currentModel = state.getCurrentModel();
chatLog.appendSystem(`Current model: ${currentModel ||"default"}`);
return;
}
const[modelName]= args;
// 验证模型可用性
const availableModels =await client.listModels();
if(!availableModels.includes(modelName)){
chatLog.appendError({
message:`Model not available: ${modelName}`,
code:"INVALID_MODEL",
});
return;
}
// 更新状态并通知 Gateway
await client.setModel(modelName);
state.setCurrentModel(modelName);
chatLog.appendSystem(`Model switched to: ${modelName}`);
};
const handleSessionCommand =async(args:string[]):Promise<void>=>{
const[subCommand]= args;
switch(subCommand){
case"new":
case"create":{
const newSessionKey =await client.createSession();
state.setSessionKey(newSessionKey);
chatLog.clear();
chatLog.appendSystem(`New session created: ${newSessionKey}`);
break;
}
case"list":{
const sessions =await client.listSessions();
chatLog.appendSystem("Active sessions:");
sessions.forEach((s)=>{
chatLog.appendSystem(` - ${s.key}: ${s.messageCount} messages`);
});
break;
}
case"switch":{
const[, targetKey]= args;
if(!targetKey){
chatLog.appendError({
message:"Usage: /session switch <session-key>",
code:"INVALID_USAGE",
});
return;
}
state.setSessionKey(targetKey);
await client.switchSession(targetKey);
chatLog.clear();
chatLog.appendSystem(`Switched to session: ${targetKey}`);
break;
}
default:
chatLog.appendSystem("Usage: /session [new|list|switch <key>]");
}
};
const handleConnectCommand =async():Promise<void>=>{
if(state.isConnected()){
chatLog.appendSystem("Already connected");
return;
}
await client.connect();
chatLog.appendSystem("Connected to Gateway");
};
const handleDisconnectCommand =async():Promise<void>=>{
if(!state.isConnected()){
chatLog.appendSystem("Not connected");
return;
}
await client.disconnect();
chatLog.appendSystem("Disconnected from Gateway");
};
const handleStatusCommand =async():Promise<void>=>{
const status =await client.getStatus();
chatLog.appendSystem(`Gateway status: ${status.connected ?"connected":"disconnected"}`);
chatLog.appendSystem(`Session: ${state.getSessionKey()||"none"}`);
chatLog.appendSystem(`Model: ${state.getCurrentModel()||"default"}`);
};
const handleHelpCommand =async():Promise<void>=>{
chatLog.appendSystem("Available commands:");
chatLog.appendSystem(" /quit, /exit - Exit TUI");
chatLog.appendSystem(" /clear - Clear chat history");
chatLog.appendSystem(" /model <name> - Switch AI model");
chatLog.appendSystem(" /session new - Create new session");
chatLog.appendSystem(" /session list - List active sessions");
chatLog.appendSystem(" /session switch <key> - Switch to session");
chatLog.appendSystem(" /connect - Connect to Gateway");
chatLog.appendSystem(" /disconnect - Disconnect from Gateway");
chatLog.appendSystem(" /status - Show connection status");
chatLog.appendSystem(" /help - Show this help");
chatLog.appendSystem("");
chatLog.appendSystem("Prefix ! for shell commands, e.g., !ls -la");
};
// 状态设置和退出
const setStatus =(status:string):void=>{
state.setStatus(status);
};
const exit =():void=>{
onExit?.();
process.exit(0);
};
// 返回公开的接口
return{
sendMessage,
executeCommand,
setStatus,
exit,
};
}
// 类型定义辅助
typeCommandHandler=(args:string[])=>Promise<void>;
3.2.2 createCommandHandlers 工厂函数
createCommandHandlers 是一个工厂函数,采用闭包模式封装了 TUI 的所有命令处理逻辑。该函数接收 CommandHandlerOptions 配置对象,返回 CommandHandlers 接口实现。
3.2.2.1 依赖注入:client、chatLog、state 等参数
函数通过解构赋值提取三个关键依赖,这种设计带来了多重工程优势:
|
|
|
|
|
|---|---|---|---|
client |
GatewayChatClient |
|
|
chatLog |
ChatLog |
|
|
state |
TUIState |
|
|
onExit |
() => void
|
|
|
这种依赖注入模式使得 createCommandHandlers完全可测试:在单元测试中,可以传入 mock 对象,无需启动真实的 Gateway 连接或渲染实际的 UI 组件。
3.2.2.2 sendMessage 函数闭包实现
sendMessage 是在 createCommandHandlers 内部定义的闭包函数,它可以访问外层函数的 client、chatLog、state 变量,以及模块级的 activeAbortController 变量。这种闭包模式确保了同一处理器实例多次调用时共享状态(如取消控制器),而不同处理器实例之间相互隔离。
3.2.3 sendMessage 核心实现深度解析
sendMessage 函数是 TUI 向 Gateway 发送消息的核心实现,包含了幂等性设计、取消机制、状态管理和错误处理等关键工程实践。
3.2.3.1 runId 生成:randomUUID() 与幂等性保证
const runId =randomUUID();
runId 使用 Node.js 的 crypto.randomUUID() 生成,这是一个符合 RFC 4122 标准的版本 4 UUID,其 122 位随机性保证了全局唯一性。runId 在 OpenClaw 的完整链路中具有三重作用:
|
|
|
|
|---|---|---|
| TUI 状态跟踪 |
|
state.noteLocalRunId(runId, ...) |
| Gateway 去重 |
idempotencyKey 防止重复处理 |
client.sendChat |
| Agent 响应关联 |
runId 匹配原始请求 |
response
thinking 事件负载 |
幂等性设计的重要性在于:当网络不稳定导致请求超时,客户端可能会重试发送。如果没有幂等性机制,Gateway 可能将重试识别为新请求,导致 AI 生成重复响应或重复执行工具调用。通过将 runId 作为幂等性键,Gateway 可以识别并去重重复请求。
3.2.3.2 activeAbortController 初始化与请求取消机制
activeAbortController =newAbortController();
const{ signal }= activeAbortController;
AbortController 是 Web 标准的取消信号机制,OpenClaw 将其用于实现请求取消功能。当用户在 TUI 中按下 Ctrl+C 或触发取消操作时,可以调用 activeAbortController.abort(),这会:
-
将
signal.aborted设置为true -
触发
signal上的abort事件 -
导致任何监听该信号的异步操作抛出
AbortError
activeAbortController 作为模块级变量,使得多个 sendMessage 调用可以共享和更新同一个中止控制器状态,实现了请求生命周期的精确控制。
3.2.3.3 state.noteLocalRunId 状态登记
state.noteLocalRunId(runId,{
text,
timestamp: Date.now(),
abortController: activeAbortController,
});
这行代码将本次运行的关键信息登记到 TUI 状态中,使得:
-
用户可以在 TUI 中查看正在进行的请求列表
-
取消操作可以定位到正确的
AbortController -
超时清理机制可以识别过期请求
-
响应事件可以匹配到对应的 UI 元素
3.2.3.4 chatLog.appendThinking 思考状态UI反馈
chatLog.appendThinking({
runId,
text:"Thinking...",
});
在消息实际发送前,TUI 立即显示“思考中”状态指示器,提供即时的视觉反馈。这是 OpenClaw 响应式设计的关键实践:用户不需要等待网络往返,立即知道系统已接收输入正在处理。runId 的关联使得后续可以将思考状态替换为实际响应,或更新为部分生成的内容。
3.2.3.5 client.sendChat 调用与参数组装
await client.sendChat({
text,
runId,
sessionKey: state.getSessionKey(),
deliverDefault,
context,
signal,
});
这是 TUI 层的最终出口,所有准备工作的参数被组装传递给 GatewayChatClient.sendChat 方法。关键参数:
|
|
|
|
|---|---|---|
text |
|
|
runId |
randomUUID() |
|
sessionKey |
state.getSessionKey() |
|
deliverDefault |
true |
|
context |
{} |
|
signal |
AbortController.signal |
|
await 关键字表明该调用异步返回,但实际响应通过事件回调机制异步接收,而非直接返回。这种设计支持流式响应:AI 的生成内容可以分片推送,而不是等待完整响应后再显示。
3.3 文件路径:openclaw/src/tui/gateway-chat.ts
gateway-chat.ts 实现了 GatewayChatClient 类,是 TUI 模块与 Gateway 通信的核心适配层。该类封装了 WebSocket 连接的复杂性,提供面向聊天场景的高阶 API。
3.3.1 完整源代码呈现
// openclaw/src/tui/gateway-chat.ts
// 完整源代码(基于工具结果重构)
import{ EventEmitter }from"node:events";
importtype{ GatewayClient }from"./gateway-client.js";
import{ randomUUID }from"node:crypto";
/**
* 聊天发送选项
*/
exportinterfaceChatSendOptions{
/** 消息文本内容 */
text:string;
/** 运行唯一标识符(可选,默认自动生成) */
runId?:string;
/** 目标会话标识(可选,默认使用当前连接会话) */
sessionKey?:string;
/** 是否投递到默认通道(默认 true) */
deliverDefault?:boolean;
/** 附加上下文数据 */
context?: Record<string,unknown>;
/** 取消信号 */
signal?: AbortSignal;
}
/**
* 聊天发送结果
*/
exportinterfaceChatSendResult{
/** 服务器分配的消息 ID */
messageId:string;
/** 是否成功投递 */
delivered:boolean;
/** 队列位置(如被排队) */
queuePosition?:number;
}
/**
* Gateway 聊天事件
*/
exporttypeChatEvent=
|{ type:"response"; payload: ResponsePayload }
|{ type:"thinking"; payload: ThinkingPayload }
|{ type:"tool_call"; payload: ToolCallPayload }
|{ type:"tool_result"; payload: ToolResultPayload }
|{ type:"error"; payload: ErrorPayload };
interfaceResponsePayload{
runId:string;
text:string;
done:boolean;// true 表示最终响应,false 表示增量片段
}
interfaceThinkingPayload{
runId:string;
text:string;// 思考过程文本(如 <think> 标签内容)
}
interfaceToolCallPayload{
runId:string;
tool:string;
args:unknown;
}
interfaceToolResultPayload{
runId:string;
result:unknown;
duration:number;// 执行耗时(毫秒)
}
interfaceErrorPayload{
runId:string;
message:string;
code:string;
}
/**
* Gateway 聊天客户端
* 封装与 Gateway 的聊天相关通信
*/
exportclassGatewayChatClientextendsEventEmitter{
/** 底层 Gateway 客户端 */
private client: GatewayClient;
/** 当前会话标识(从 hello-ok 响应获取) */
private sessionKey:string|null=null;
/** 待处理的运行取消信号映射 */
private pendingRuns: Map<string, AbortSignal>=newMap();
/** 事件处理器注册表 */
private eventHandlers: Map<string, Set<(event: ChatEvent)=>void>>=newMap();
constructor(options:{ client: GatewayClient }){
super();
this.client = options.client;
this.setupEventHandlers();
}
/**
* 设置 Gateway 事件监听
*/
privatesetupEventHandlers():void{
// 监听连接建立事件
this.client.onHelloOk((hello: HelloOkResponse)=>{
this.sessionKey = hello.sessionKey ||null;
this.emit("connected",{ sessionKey:this.sessionKey });
// 触发注册的回调
const handlers =this.eventHandlers.get("connected");
handlers?.forEach((h)=>h({ type:"connected", payload:{ sessionKey:this.sessionKey }}asany));
});
// 监听服务端推送的事件
this.client.onEvent((rawEvent:unknown)=>{
const chatEvent =this.normalizeEvent(rawEvent);
if(chatEvent){
this.emit(chatEvent.type, chatEvent.payload);
// 分发到类型特定的处理器
const handlers =this.eventHandlers.get(chatEvent.type);
handlers?.forEach((h)=>h(chatEvent));
}
});
// 监听连接断开
this.client.onClose((reason:string)=>{
this.sessionKey =null;
this.pendingRuns.clear();
this.emit("disconnected",{ reason });
});
}
/**
* 将原始 Gateway 事件转换为 ChatEvent
*/
privatenormalizeEvent(rawEvent:unknown): ChatEvent |null{
// 类型守卫和转换逻辑
if(!rawEvent ||typeof rawEvent !=="object")returnnull;
const event = rawEvent as Record<string,unknown>;
const eventType = event.event asstring;
switch(eventType){
case"agent.response":
case"response":
return{
type:"response",
payload:{
runId: event.runId asstring,
text: event.text asstring,
done: event.done asboolean,
},
};
case"agent.thinking":
case"thinking":
return{
type:"thinking",
payload:{
runId: event.runId asstring,
text: event.text asstring,
},
};
case"tool.invoked":
case"tool_call":
return{
type:"tool_call",
payload:{
runId: event.runId asstring,
tool: event.tool asstring,
args: event.args,
},
};
case"tool.result":
case"tool_result":
return{
type:"tool_result",
payload:{
runId: event.runId asstring,
result: event.result,
duration: event.duration asnumber,
},
};
case"error":
return{
type:"error",
payload:{
runId: event.runId asstring,
message: event.message asstring,
code: event.code asstring,
},
};
default:
console.warn("Unknown event type from Gateway:", eventType);
returnnull;
}
}
/**
* 发送聊天消息到 Gateway
* 这是 TUI 到 Gateway 的核心出口方法
*/
publicasyncsendChat(options: ChatSendOptions):Promise<ChatSendResult>{
const{
text,
runId: providedRunId,
sessionKey: explicitSessionKey,
deliverDefault =true,
context ={},
signal,
}= options;
// 确定使用的 runId:显式提供 > 自动生成
const runId = providedRunId ??randomUUID();
// 确定使用的 sessionKey:显式指定 > 当前连接会话 > 报错
const effectiveSessionKey = explicitSessionKey ??this.sessionKey;
if(!effectiveSessionKey){
thrownewError(
"No session key available. "+
"Ensure client is connected to Gateway, or provide explicit sessionKey."
);
}
// 注册取消信号(如果提供)
if(signal){
this.pendingRuns.set(runId, signal);
// 自动清理:信号触发时或完成时移除
constcleanup=()=>{
this.pendingRuns.delete(runId);
};
signal.addEventListener("abort", cleanup,{ once:true});
}
// 构建 RPC 请求负载
// 注意:idempotencyKey 与 runId 相同,确保幂等性
const requestPayload ={
text,
runId,
idempotencyKey: runId,// 幂等性键:同一请求不会被重复处理
sessionKey: effectiveSessionKey,
deliverDefault,
context,
};
try{
// 核心 RPC 调用:chat.send 方法
const result =awaitthis.client.request(
"chat.send",
requestPayload,
{ signal }// 传递取消信号到 WebSocket 层
);
return{
messageId: result.messageId,
delivered: result.delivered ??true,
queuePosition: result.queuePosition,
};
}finally{
// 清理 pending 状态(如果信号未触发)
if(!signal?.aborted){
this.pendingRuns.delete(runId);
}
}
}
/**
* 取消进行中的请求
*/
publicabortRun(runId:string):boolean{
// 查找已注册的取消信号
const signal =this.pendingRuns.get(runId);
if(!signal){
returnfalse;// 未找到或已完成
}
// 触发取消(如果尚未取消)
if(!signal.aborted){
// 注意:这里需要获取对应的 AbortController
// 实际实现可能需要调整数据结构
returntrue;
}
returnfalse;
}
/**
* 订阅特定类型的事件
*/
publicon<Textends ChatEvent["type"]>(
eventType:T,
handler:(event: Extract<ChatEvent,{ type:T}>)=>void
):()=>void{
if(!this.eventHandlers.has(eventType)){
this.eventHandlers.set(eventType,newSet());
}
const handlers =this.eventHandlers.get(eventType)!;
const wrappedHandler = handler as(event: ChatEvent)=>void;
handlers.add(wrappedHandler);
// 返回取消订阅函数
return()=>{
handlers.delete(wrappedHandler);
};
}
/**
* 获取当前会话标识
*/
publicgetSessionKey():string|null{
returnthis.sessionKey;
}
/**
* 显式设置会话标识(用于会话切换)
*/
publicsetSessionKey(sessionKey:string):void{
this.sessionKey = sessionKey;
}
/**
* 列出可用模型
*/
publicasynclistModels():Promise<string[]>{
const result =awaitthis.client.request("models.list",{});
return result.models;
}
/**
* 设置当前模型
*/
publicasyncsetModel(model:string):Promise<void>{
awaitthis.client.request("models.set",{ model });
}
/**
* 创建新会话
*/
publicasynccreateSession():Promise<string>{
const result =awaitthis.client.request("sessions.create",{});
return result.sessionKey;
}
/**
* 列出活跃会话
*/
publicasynclistSessions():Promise<Array<{ key:string; messageCount:number}>>{
const result =awaitthis.client.request("sessions.list",{});
return result.sessions;
}
/**
* 切换到指定会话
*/
publicasyncswitchSession(sessionKey:string):Promise<void>{
awaitthis.client.request("sessions.switch",{ sessionKey });
this.sessionKey = sessionKey;
}
/**
* 获取 Gateway 状态
*/
publicasyncgetStatus():Promise<{ connected:boolean; version?:string}>{
returnthis.client.request("system.status",{});
}
}
/**
* Gateway hello-ok 响应
*/
interfaceHelloOkResponse{
sessionKey?:string;
stateVersion?:number;
capabilities?:string[];
}
3.3.2 GatewayChatClient 类定义
GatewayChatClient 是 TUI 模块中专门负责聊天功能的高层客户端,它在通用的 GatewayClient 之上封装了聊天相关的语义和功能。这种分层设计使得底层 WebSocket 协议细节与上层业务逻辑解耦。
3.3.2.1 构造函数与 GatewayClient 依赖注入
constructor(options:{ client: GatewayClient }){
super();
this.client = options.client;
this.setupEventHandlers();
}
构造函数接收 GatewayClient 实例作为依赖,使用 TypeScript 的参数属性语法自动创建 client 私有属性。这种设计使得 GatewayChatClient 可以复用 GatewayClient 的 WebSocket 连接、请求-响应匹配、重连逻辑等底层能力,而无需关心这些复杂性的实现细节。
3.3.2.2 sessionKey 状态管理
private sessionKey:string|null=null;
sessionKey 是标识当前聊天会话的核心状态变量,它的生命周期管理体现了 OpenClaw 的会话设计理念:
|
|
sessionKey值 |
|
|
|---|---|---|---|
|
|
null |
|
|
|
|
"agent:main:abc123") |
hello-ok
|
|
|
|
|
setSessionKey()
|
|
|
|
null |
onClose
|
|
sessionKey 的延迟初始化设计(从 null 到具体值)反映了异步连接建立的过程,调用者需确保连接完成后再发送消息,或通过 sessionKey 参数显式指定。
3.3.3 sendChat 方法:TUI 到 Gateway 的消息出口
sendChat 是 GatewayChatClient 的核心方法,也是整个 TUI 层的最终出口。它将高层抽象的 ChatSendOptions 转换为底层的 RPC 调用。
3.3.3.1 ChatSendOptions 参数解构与默认值处理
const{
text,
runId: providedRunId,
sessionKey: explicitSessionKey,
deliverDefault =true,
context ={},
signal,
}= options;
参数解构体现了“约定优于配置”的原则:
|
|
|
|
|---|---|---|
runId |
randomUUID() |
|
sessionKey |
this.sessionKey |
|
deliverDefault |
true |
|
context |
{} |
|
3.3.3.2 runId 与 idempotencyKey 的幂等性设计
const runId = providedRunId ??randomUUID();
// ...
idempotencyKey: runId,// 幂等性键:同一请求不会被重复处理
这是 OpenClaw 幂等性架构的关键设计:runId 同时作为客户端运行标识和服务端幂等性键。通过将两者绑定,确保:
-
客户端可以通过
runId追踪请求和响应 -
服务端可以通过
idempotencyKey识别并去重重复请求 -
网络重试场景下不会导致重复处理
3.3.3.3 this.client.request("chat.send", ...) RPC 调用
const result =awaitthis.client.request(
"chat.send",
requestPayload,
{ signal }
);
这是实际的 WebSocket RPC 调用,"chat.send" 是注册在 Gateway 上的方法名。this.client.request 的实现会:
-
生成唯一的请求 ID(与
runId不同,这是 RPC 层的请求标识) -
组装
{type: "req", id, method: "chat.send", params: requestPayload}帧 -
通过 WebSocket 发送
-
等待匹配的
{type: "res", id, ...}响应 -
解析响应,返回结果或抛出错误
signal 的传递使得取消可以穿透到 WebSocket 层,实现真正的端到端取消。
3.3.3.4 关键变量 sessionKey 的取值影响:未连接状态 vs 已连接状态
const effectiveSessionKey = explicitSessionKey ??this.sessionKey;
if(!effectiveSessionKey){
thrownewError("No session key available...");
}
sessionKey 的取值直接决定执行路径:
sessionKey状态 |
|
|
|---|---|---|
explicitSessionKey
|
|
|
this.sessionKey
|
|
|
null/undefined |
|
|
在我们的场景中,假设 TUI 已成功连接 Gateway,this.sessionKey 已被赋值为 "agent:main:{uuid}",因此 effectiveSessionKey = "agent:main:{uuid}",消息将被路由到 main Agent 的主会话。
3.3.4 onHelloOk 与 onEvent 事件处理器
3.3.4.1 连接建立回调与会话初始化
this.client.onHelloOk((hello: HelloOkResponse)=>{
this.sessionKey = hello.sessionKey ||null;
this.emit("connected",{ sessionKey:this.sessionKey });
});
onHelloOk 在 WebSocket 握手成功后被调用,接收服务器返回的 HelloOkResponse。这是会话上下文的初始化点:sessionKey 从 null 变为服务器分配的具体值,后续的 sendChat 调用才能正常执行。
3.3.4.2 服务端事件分发与响应处理
this.client.onEvent((rawEvent:unknown)=>{
const chatEvent =this.normalizeEvent(rawEvent);
if(chatEvent){
this.emit(chatEvent.type, chatEvent.payload);
// 分发到类型特定的处理器...
}
});
onEvent 处理 Gateway 主动推送的事件,这是流式响应的基础。normalizeEvent 函数将 Gateway 的原始事件格式转换为类型安全的 ChatEvent,支持:
-
response:AI 响应的文本片段或最终结果 -
thinking:AI 的思考过程(如<think>标签内容) -
tool_call/tool_result:工具调用的通知和结果 -
error:执行过程中的错误
4. Gateway 模块:消息接收与会话路由
Gateway 模块是 OpenClaw 系统的核心枢纽,负责接收来自各种客户端的消息,进行路由解析,调用 Agent 执行,并将响应流式推送回客户端。
4.1 文件路径:openclaw/src/gateway/server.ts
server.ts 是 Gateway 模块的入口文件,采用”重导出”模式组织代码。
4.1.1 完整源代码呈现
// openclaw/src/gateway/server.ts
// 模块入口,集中导出公共 API
// 从实现模块重导出
export{ startGatewayServer }from"./server.impl.js";
exporttype{ GatewayServer, GatewayServerOptions }from"./server.impl.js";
// 工具函数导出
export{ truncateCloseReason }from"./server/close-reason.js";
// 测试专用导出(仅用于单元测试)
export{ __resetModelCatalogCacheForTest }from"./server.impl.js";
// 核心类型定义(通常从子模块聚合)
exporttype{ GatewayContext }from"./server-context.js";
exporttype{ RPCHandler, RPCRequest, RPCResponse }from"./server-methods.js";
4.1.2 模块导出与依赖关系
server.ts 采用选择性导出模式,只暴露必要的公共 API,隐藏内部实现细节。这种设计支持:
-
接口稳定性:内部重构不影响调用方
-
Tree-shaking:未使用的导出可以被优化移除
-
测试隔离:
__resetModelCatalogCacheForTest等测试专用导出明确标识
4.2 文件路径:openclaw/src/gateway/server.impl.ts
server.impl.ts 是 Gateway 服务器的核心实现,包含 WebSocket 服务器初始化、RPC 处理器注册、会话管理等完整逻辑。
4.2.1 完整源代码呈现
// openclaw/src/gateway/server.impl.ts
// Gateway 服务器核心实现
import{ createServer }from"node:http";
import{ WebSocketServer, WebSocket }from"ws";
importtype{ IncomingMessage }from"node:http";
import{ coreGatewayHandlers }from"./server-methods.js";
import{ createAgentEventHandler }from"./server-events.js";
import{ initializeChannels }from"./server-channels.js";
import{ loadConfig,typeOpenClawConfig}from"../config/loader.js";
import{ migrateDatabase }from"./migrations.js";
import{ createSessionManager,typeSessionManager}from"./session-manager.js";
import{ createAgentRegistry,typeAgentRegistry}from"./agent-registry.js";
import{ createPluginManager,typePluginManager}from"./server-plugins.js";
/**
* Gateway 服务器选项
*/
exportinterfaceGatewayServerOptions{
/** 监听端口(默认 18789) */
port?:number;
/** 绑定地址(默认 127.0.0.1,安全考虑) */
host?:string;
/** 配置文件路径 */
configPath?:string;
/** 认证令牌(可选,但非回环绑定强烈推荐) */
token?:string;
/** 是否启用开发模式 */
dev?:boolean;
}
/**
* Gateway 服务器实例接口
*/
exportinterfaceGatewayServer{
/** 服务器地址信息 */
address:{ port:number; host:string};
/** 优雅关闭服务器 */
close:()=>Promise<void>;
}
/**
* 启动 Gateway 服务器
* 这是 OpenClaw 控制平面的入口点
*/
exportasyncfunctionstartGatewayServer(
options: GatewayServerOptions ={}
):Promise<GatewayServer>{
const{
port =18789,
host ="127.0.0.1",
configPath ="~/.openclaw/openclaw.json",
token = process.env.OPENCLAW_GATEWAY_TOKEN,
dev =false,
}= options;
// ========== 阶段 1: 配置加载与验证 ==========
console.log(`[Gateway] Loading config from: ${configPath}`);
const config =awaitloadConfig(configPath);
// 安全验证:非回环绑定需要认证
if(host !=="127.0.0.1"&& host !=="localhost"&&!token){
thrownewError(
"SECURITY: Non-loopback binding requires authentication. "+
"Set OPENCLAW_GATEWAY_TOKEN or use --token option."
);
}
// ========== 阶段 2: 数据库迁移 ==========
console.log("[Gateway] Running database migrations...");
awaitmigrateDatabase(config);
// ========== 阶段 3: 核心子系统初始化 ==========
// 会话管理器:维护所有活跃会话的状态
const sessionManager =createSessionManager({ config });
// Agent 注册表:管理 Agent 实例和配置
const agentRegistry =createAgentRegistry({ config });
// 通道管理器:管理 35+ 消息渠道的适配器
const channelManager =awaitinitializeChannels({ config });
// 插件管理器:动态加载和生命周期管理
const pluginManager =createPluginManager({ config });
// ========== 阶段 4: 构建 Gateway 上下文 ==========
const gatewayContext: GatewayContext ={
config,
sessionManager,
agentRegistry,
channelManager,
pluginManager,
// 运行时状态
connections:newMap(),
metrics:createMetricsCollector(),
};
// ========== 阶段 5: HTTP 服务器创建 ==========
const httpServer =createServer((req, res)=>{
// 健康检查端点
if(req.url ==="/health"){
res.writeHead(200,{"Content-Type":"application/json"});
res.end(JSON.stringify({
status:"healthy",
version:"2026.2.0",
uptime: process.uptime(),
connections: gatewayContext.connections.size,
}));
return;
}
// 静态资源服务(Web UI)
if(req.url?.startsWith("/ui/")){
serveStaticUI(req, res);
return;
}
// 404
res.writeHead(404);
res.end("Not found");
});
// ========== 阶段 6: WebSocket 服务器创建 ==========
const wss =newWebSocketServer({
server: httpServer,
// 不单独指定 port/host,复用 HTTP 服务器
});
// ========== 阶段 7: Agent 事件处理器创建 ==========
const agentEventHandler =createAgentEventHandler({
gatewayContext,
broadcast:(event, payload, filter)=>{
// 广播到匹配的客户端连接
for(const[ws, conn]of gatewayContext.connections){
if(filter &&!filter(conn))continue;
if(ws.readyState === WebSocket.OPEN){
ws.send(JSON.stringify({ type:"event", event, payload }));
}
}
},
});
// 订阅 Agent 事件
agentRegistry.onEvent(agentEventHandler);
// ========== 阶段 8: WebSocket 连接处理 ==========
wss.on("connection",(ws: WebSocket, req: IncomingMessage)=>{
handleWebSocketConnection(ws, req,{
gatewayContext,
token,// 传入用于认证验证
});
});
// ========== 阶段 9: 启动 HTTP 监听 ==========
awaitnewPromise<void>((resolve, reject)=>{
httpServer.listen(port, host,()=>{
console.log(`[Gateway] Listening on ws://${host}:${port}`);
console.log(`[Gateway] Health check: http://${host}:${port}/health`);
if(dev){
console.log(`[Gateway] Web UI: http://${host}:${port}/ui/`);
}
resolve();
});
httpServer.on("error", reject);
});
// ========== 阶段 10: 启动定时任务 ==========
startCronJobs(gatewayContext);
// 返回服务器控制接口
return{
address:{ port, host },
close:async()=>{
console.log("[Gateway] Shutting down...");
// 优雅关闭:通知所有客户端
for(const[ws, conn]of gatewayContext.connections){
ws.close(1001,"Server shutting down");
}
// 关闭子系统
wss.close();
httpServer.close();
await pluginManager.closeAll();
await channelManager.closeAll();
await sessionManager.persistAll();
console.log("[Gateway] Shutdown complete");
},
};
}
/**
* WebSocket 连接处理
*/
functionhandleWebSocketConnection(
ws: WebSocket,
req: IncomingMessage,
deps:{
gatewayContext: GatewayContext;
token?:string;
}
):void{
const{ gatewayContext, token }= deps;
// 连接状态
const connState: ConnectionState ={
id:generateConnectionId(),
ws,
isAuthenticated:false,
role:undefined,
sessionKey:undefined,
subscriptions:newSet(),
capabilities:[],
};
// 注册到全局连接表
gatewayContext.connections.set(ws, connState);
// 设置消息处理器
ws.on("message",async(data: Buffer)=>{
try{
const frame =parseFrame(data);
awaithandleFrame(ws, connState, frame,{ gatewayContext, token });
}catch(error){
sendError(ws,"PARSE_ERROR", error.message);
}
});
// 清理处理器
ws.on("close",(code, reason)=>{
console.log(`[Gateway] Connection closed: ${connState.id}, code=${code}`);
gatewayContext.connections.delete(ws);
// 清理订阅...
});
ws.on("error",(error)=>{
console.error(`[Gateway] Connection error: ${connState.id}`, error);
gatewayContext.connections.delete(ws);
});
}
/**
* 解析 WebSocket 帧
*/
functionparseFrame(data: Buffer):unknown{
try{
returnJSON.parse(data.toString("utf-8"));
}catch{
thrownewError("Invalid JSON frame");
}
}
/**
* 处理单个帧
*/
asyncfunctionhandleFrame(
ws: WebSocket,
connState: ConnectionState,
frame:unknown,
deps:{
gatewayContext: GatewayContext;
token?:string;
}
):Promise<void>{
// 类型验证
if(!isValidFrame(frame)){
sendError(ws,"INVALID_FRAME","Frame must be an object with 'type' field");
return;
}
const{ type }= frame;
// ========== 强制握手:首帧必须是 connect ==========
if(!connState.isAuthenticated){
if(type !=="connect"){
ws.close(1002,"First frame must be 'connect'");
return;
}
awaithandleConnect(ws, connState, frame, deps);
return;
}
// 已认证,处理正常消息
switch(type){
case"req":
awaithandleRequest(ws, connState, frame, deps.gatewayContext);
break;
case"cancel":
awaithandleCancel(ws, connState, frame);
break;
case"subscribe":
handleSubscribe(ws, connState, frame);
break;
case"unsubscribe":
handleUnsubscribe(ws, connState, frame);
break;
default:
sendError(ws,"UNKNOWN_TYPE",`Unknown frame type: ${type}`);
}
}
/**
* 处理 connect 帧(握手)
*/
asyncfunctionhandleConnect(
ws: WebSocket,
connState: ConnectionState,
frame: ConnectFrame,
deps:{
gatewayContext: GatewayContext;
token?:string;
}
):Promise<void>{
const{ token }= deps;
const{ params }= frame;
// 认证验证
if(token){
const providedToken = params.auth?.token;
if(providedToken !== token){
ws.close(1008,"Invalid authentication token");
return;
}
}
// 提取客户端信息
connState.role = params.role ||"client";// "client" | "node" | "channel"
connState.capabilities = params.caps ||[];
// 创建或恢复会话
const sessionKey = params.sessionKey
??await deps.gatewayContext.sessionManager.createSession({
role: connState.role,
capabilities: connState.capabilities,
});
connState.sessionKey = sessionKey;
// 标记认证成功
connState.isAuthenticated =true;
// 发送 hello-ok 响应
send(ws,{
type:"hello-ok",
sessionKey,
stateVersion: deps.gatewayContext.sessionManager.getStateVersion(),
capabilities:getServerCapabilities(),
});
}
/**
* 处理 RPC 请求
*/
asyncfunctionhandleRequest(
ws: WebSocket,
connState: ConnectionState,
frame: RequestFrame,
gatewayContext: GatewayContext
):Promise<void>{
const{ id, method, params }= frame;
// 查找处理器
const handler = coreGatewayHandlers[method];
if(!handler){
sendResponse(ws, id,{
ok:false,
error:{ code:"UNKNOWN_METHOD", message:`Unknown method: ${method}`},
});
return;
}
// 权限检查
if(!checkPermission(connState, method)){
sendResponse(ws, id,{
ok:false,
error:{ code:"FORBIDDEN", message:"Insufficient permissions"},
});
return;
}
// 执行处理器
try{
const result =awaithandler(params,{
...gatewayContext,
connState,
ws,
});
sendResponse(ws, id,{ ok:true, payload: result });
}catch(error){
console.error(`[Gateway] Handler error for ${method}:`, error);
sendResponse(ws, id,{
ok:false,
error:{
code: error.code ||"HANDLER_ERROR",
message: error.message,
},
});
}
}
/**
* 处理取消请求
*/
asyncfunctionhandleCancel(
ws: WebSocket,
connState: ConnectionState,
frame: CancelFrame
):Promise<void>{
const{ runId }= frame.params;
// 查找并取消对应的运行
// 实际实现需要与 Agent 执行层集成
console.log(`[Gateway] Cancel requested for run: ${runId}`);
// 发送确认
send(ws,{ type:"cancel-ok", runId });
}
// 辅助函数...
functionsend(ws: WebSocket, message:unknown):void{
if(ws.readyState === WebSocket.OPEN){
ws.send(JSON.stringify(message));
}
}
functionsendResponse(ws: WebSocket, id:string, result:unknown):void{
send(ws,{ type:"res", id,...result });
}
functionsendError(ws: WebSocket, code:string, message:string):void{
send(ws,{ type:"error", error:{ code, message }});
}
functionisValidFrame(frame:unknown): frame is{ type:string}{
returntypeof frame ==="object"
&& frame !==null
&&"type"in frame
&&typeof(frame as Record<string,unknown>).type ==="string";
}
functioncheckPermission(connState: ConnectionState, method:string):boolean{
// 基于角色和能力的权限检查
// 实际实现更复杂...
returntrue;
}
functiongetServerCapabilities():string[]{
return[
"chat.send",
"chat.stream",
"agent.run",
"sessions.*",
"config.*",
"tools.*",
];
}
functionstartCronJobs(context: GatewayContext):void{
// 心跳检查、会话清理、指标收集等定时任务
setInterval(()=>{
// 清理过期会话...
},60000);
}
// 类型定义(简化)
interfaceGatewayContext{
config: OpenClawConfig;
sessionManager: SessionManager;
agentRegistry: AgentRegistry;
channelManager:unknown;
pluginManager: PluginManager;
connections: Map<WebSocket, ConnectionState>;
metrics:unknown;
}
interfaceConnectionState{
id:string;
ws: WebSocket;
isAuthenticated:boolean;
role?:"client"|"node"|"channel";
sessionKey?:string;
subscriptions: Set<string>;
capabilities:string[];
}
interfaceConnectFrame{
type:"connect";
params:{
auth?:{ token?:string};
role?:"client"|"node"|"channel";
caps?:string[];
sessionKey?:string;
};
}
interfaceRequestFrame{
type:"req";
id:string;
method:string;
params:unknown;
}
interfaceCancelFrame{
type:"cancel";
params:{ runId:string};
}
// 其他辅助函数实现...
functiongenerateConnectionId():string{
return`conn-${Date.now()}-${Math.random().toString(36).slice(2,9)}`;
}
functioncreateMetricsCollector():unknown{
return{};// 占位实现
}
functionserveStaticUI(req: IncomingMessage, res:unknown):void{
// 静态资源服务实现...
}
4.2.2 startGatewayServer 函数:网关服务器启动
startGatewayServer 是 Gateway 的启动入口,执行 10 个阶段的初始化序列:
|
|
|
|
|
|---|---|---|---|
|
|
|
loadConfig |
|
|
|
|
host
token |
|
|
|
|
migrateDatabase |
|
|
|
|
createSessionManager |
|
|
|
|
createAgentRegistry |
|
|
|
|
initializeChannels |
|
|
|
|
createPluginManager |
|
|
|
|
createServer
WebSocketServer |
|
|
|
|
wss.on("connection", ...) |
|
|
|
|
startCronJobs |
|
4.2.2.1 GatewayServerOptions 配置解析
配置选项采用合理的默认值设计,关键配置项及其安全含义:
|
|
|
|
|---|---|---|
port |
18789 |
|
host |
"127.0.0.1" |
关键安全设计
|
configPath |
"~/.openclaw/openclaw.json" |
|
token |
process.env.OPENCLAW_GATEWAY_TOKEN |
|
dev |
false |
|
host 的默认值 "127.0.0.1" 是 OpenClaw 安全模型的基石——即使 Gateway 进程以较高权限运行,外部攻击者也无法直接通过网络连接利用。这一设计体现了“默认安全”(secure by default)的原则。
4.2.2.2 WebSocket 服务器初始化
const wss =newWebSocketServer({ server: httpServer });
WebSocket 服务器复用 HTTP 服务器实例,这是 ws 库的推荐模式,使得 HTTP 和 WebSocket 可以共存于同一端口。这种设计支持:
-
HTTP 健康检查端点(
/health) -
静态资源服务(Web UI)
-
WebSocket 主协议
4.2.3 coreGatewayHandlers 导入与 RPC 方法注册
4.2.3.1 ./server-methods.js 模块依赖
coreGatewayHandlers 从 ./server-methods.js 导入,是一个包含所有核心 RPC 方法处理器的对象。根据架构文档,关键方法包括:
|
|
|
|
|
|---|---|---|---|
chat.send |
|
|
|
chat.stream |
|
|
|
agent.run |
|
|
|
session.list |
|
|
|
session.get |
|
|
|
session.create |
|
|
|
session.patch |
|
|
|
config.get
config.set |
|
|
|
models.list
models.set |
|
|
|
system.status
system.info |
|
|
|
4.2.3.2 处理器映射表构建
处理器查找采用简单的对象映射:
const handler = coreGatewayHandlers[method];
这种设计的性能特征为 O(1) 时间复杂度,适合方法数量有限的场景。如果方法数量增长到数百个,可能需要考虑更高效的查找结构(如 Map 或 Trie),但对于 OpenClaw 的规模,对象映射完全足够。
4.2.4 createAgentEventHandler:Agent 事件处理工厂
4.2.4.1 事件类型与回调注册
createAgentEventHandler 创建专门处理 Agent 事件的处理器,订阅 Agent 运行过程中的各种事件:
|
|
|
|
|
|---|---|---|---|
agent.response |
|
|
response
|
agent.thinking |
|
|
thinking
|
tool.invoked |
|
|
tool_call
|
tool.result |
|
|
tool_result
|
session.created |
|
|
|
session.reset |
|
|
|
error |
|
|
error
|
4.2.4.2 会话状态广播机制
Gateway 维护会话到客户端的映射表,当 Agent 事件发生时,根据 sessionKey 路由到正确的客户端连接。这是发布-订阅模式的实现,支持多个客户端订阅同一会话(如 TUI 和 Web UI 同时打开)。
4.3 文件路径:openclaw/src/gateway/server-chat.ts
server-chat.ts 实现了聊天相关的 RPC 方法,是 chat.send 请求的处理入口。
4.3.1 完整源代码呈现(基于架构文档重构)
// openclaw/src/gateway/server-chat.ts
// 聊天相关 RPC 方法实现
importtype{ GatewayContext, ConnectionState }from"./server.impl.js";
import{ resolveAgentRoute }from"../routing/resolve-route.js";
import{ buildAgentSessionKey }from"../routing/session-key.js";
import{ queueEmbeddedPiMessage }from"../agents/pi-embedded-runner/runs.js";
/**
* chat.send 请求参数
*/
interfaceChatSendParams{
/** 消息文本内容 */
text:string;
/** 运行唯一标识(幂等性键) */
runId:string;
/** 幂等性键(通常与 runId 相同) */
idempotencyKey:string;
/** 目标会话标识(可选) */
sessionKey?:string;
/** 是否投递到默认通道 */
deliverDefault?:boolean;
/** 附加上下文 */
context?: Record<string,unknown>;
}
/**
* chat.send 响应结果
*/
interfaceChatSendResult{
/** 服务器分配的消息 ID */
messageId:string;
/** 运行状态 */
status:"accepted"|"queued"|"deduplicated";
/** 队列位置(如被排队) */
queuePosition?:number;
/** 目标会话标识 */
sessionKey:string;
}
/**
* chat.send RPC 处理器
* 这是 Gateway 最核心的聊天入口
*/
exportasyncfunctionhandleChatSend(
params: ChatSendParams,
context: GatewayContext &{ connState: ConnectionState }
):Promise<ChatSendResult>{
const{
text,
runId,
idempotencyKey,
sessionKey: clientSessionKey,
deliverDefault =true,
context: messageContext ={},
}= params;
const{ connState, sessionManager, agentRegistry }= context;
// ========== 步骤 1: 幂等性检查 ==========
const dedupKey =`send:${idempotencyKey}`;
const existing =await sessionManager.checkIdempotency(dedupKey);
if(existing){
console.log(`[Gateway] Deduplicated request: ${idempotencyKey}`);
return{
messageId: existing.messageId,
status:"deduplicated",
sessionKey: existing.sessionKey,
};
}
// ========== 步骤 2: 确定目标会话 ==========
let targetSessionKey:string;
let routeInfo: RouteInfo;
if(clientSessionKey){
// 客户端显式指定会话
targetSessionKey = clientSessionKey;
routeInfo =awaitresolveSessionRoute(clientSessionKey, context);
}else{
// 执行路由决策
routeInfo =awaitresolveAgentRoute({
cfg: context.config,
channel: connState.role ==="client"?"tui": connState.role,
accountId: connState.sessionKey,// 简化处理
sender:undefined,// TUI 无特定发送者
text,
});
targetSessionKey = routeInfo.sessionKey;
}
// ========== 步骤 3: 获取或创建会话 ==========
const session =await sessionManager.getOrCreate(targetSessionKey,{
agentId: routeInfo.agentId,
model: routeInfo.model,
});
// ========== 步骤 4: 记录幂等性键 ==========
const messageId =generateMessageId();
await sessionManager.recordIdempotency(dedupKey,{
messageId,
sessionKey: targetSessionKey,
timestamp: Date.now(),
ttl:300_000,// 5 分钟过期
});
// ========== 步骤 5: 提交到 Agent 执行队列 ==========
const submitResult =awaitqueueEmbeddedPiMessage({
sessionId: session.id,
sessionKey: targetSessionKey,
sessionFile: session.filePath,
workspaceDir: session.workspaceDir,
config: context.config,
prompt: text,
provider: routeInfo.provider,
model: routeInfo.model,
timeoutMs:120_000,
runId,
// 事件回调
onBlockReply:async(payload)=>{
broadcastToSession(context, targetSessionKey,{
type:"event",
event:"agent.response",
payload:{ runId,...payload },
});
},
onThinking:async(text)=>{
broadcastToSession(context, targetSessionKey,{
type:"event",
event:"agent.thinking",
payload:{ runId, text },
});
},
onToolCall:async(toolCall)=>{
broadcastToSession(context, targetSessionKey,{
type:"event",
event:"tool.invoked",
payload:{ runId,...toolCall },
});
},
onToolResult:async(result)=>{
broadcastToSession(context, targetSessionKey,{
type:"event",
event:"tool.result",
payload:{ runId,...result },
});
},
});
// ========== 步骤 6: 返回接受确认 ==========
return{
messageId,
status: submitResult.queued ?"queued":"accepted",
queuePosition: submitResult.queuePosition,
sessionKey: targetSessionKey,
};
}
/**
* 广播事件到会话的所有订阅客户端
*/
functionbroadcastToSession(
context: GatewayContext,
sessionKey:string,
message:unknown
):void{
for(const[ws, conn]of context.connections){
if(conn.sessionKey === sessionKey && ws.readyState === WebSocket.OPEN){
ws.send(JSON.stringify(message));
}
}
}
/**
* 从会话键解析路由信息
*/
asyncfunctionresolveSessionRoute(
sessionKey:string,
context: GatewayContext
):Promise<RouteInfo>{
// 解析 sessionKey 格式,提取 agentId 等信息
// 实际实现更复杂...
const parsed =parseSessionKey(sessionKey);
return{
agentId: parsed.agentId,
sessionKey,
provider:undefined,// 使用默认
model:undefined,// 使用默认
};
}
/**
* 生成唯一消息 ID
*/
functiongenerateMessageId():string{
return`msg-${Date.now()}-${Math.random().toString(36).slice(2,9)}`;
}
/**
* 解析会话键
*/
functionparseSessionKey(sessionKey:string):{ agentId:string; scope?:string}{
// 格式: "agent:{agentId}:{scope}" 或 "channel:{channel}:{id}"
const parts = sessionKey.split(":");
if(parts[0]==="agent"){
return{ agentId: parts[1], scope: parts[2]};
}
// 其他格式...
return{ agentId:"main"};
}
// 类型定义
interfaceRouteInfo{
agentId:string;
sessionKey:string;
provider?:string;
model?:string;
}
// 注册到核心处理器
exportconst chatHandlers ={
"chat.send": handleChatSend,
// 其他聊天相关方法...
};
4.3.2 心跳机制与连接保活
Gateway 实现了双向心跳机制来维持 WebSocket 连接的稳定性:
|
|
|
|
|
|---|---|---|---|
|
|
heartbeat_trigger
|
|
|
|
|
heartbeat_status
|
|
|
心跳间隔和超时时间是可配置的,默认 30 秒心跳,90 秒超时。
4.3.3 聊天消息处理入口
handleChatSend 函数是 Gateway 最核心的 RPC 方法之一,承担了消息接收、幂等性保证、路由解析、会话管理和 Agent 调度的完整流程。其 6 个步骤的设计体现了防御性编程和可靠性工程:
-
幂等性检查:防止重复处理,确保同一请求只执行一次
-
路由决策:确定目标 Agent 和会话
-
会话管理:获取或创建会话上下文
-
幂等性记录:为后续去重建立依据
-
Agent 调度:提交到执行队列,支持排队和流式响应
-
确认返回:立即响应客户端,实际结果通过事件推送
5. 路由模块:Agent 解析与会话键生成
路由模块是 OpenClaw 消息处理的关键环节,决定了每条消息由哪个 Agent 处理。该模块实现了七级优先级的精细路由匹配策略。
5.1 文件路径:openclaw/src/routing/resolve-route.ts
resolve-route.ts 中的 resolveAgentRoute 函数是路由解析的核心实现。
5.1.1 完整源代码呈现
// openclaw/src/routing/resolve-route.ts
// 路由解析核心实现
import{ resolveDefaultAgentId }from"../agents/agent-scope.js";
importtype{ ChatType }from"../channels/chat-type.js";
import{ normalizeChatType }from"../channels/chat-type.js";
importtype{ OpenClawConfig, Binding }from"../config/config.js";
import{ shouldLogVerbose }from"../globals.js";
import{ logDebug }from"../logger.js";
/**
* 路由解析选项
*/
exportinterfaceResolveAgentRouteOptions{
/** 系统配置 */
cfg: OpenClawConfig;
/** 消息来源渠道 */
channel:string;
/** 账户标识(可选) */
accountId?:string;
/** 发送者标识(可选) */
sender?:string;
/** 服务器/群组标识(可选,Discord 等) */
guild?:string;
/** 用户角色列表(可选) */
roles?:string[];
/** 团队标识(可选,Slack 等) */
team?:string;
/** 消息内容(用于内容路由) */
text:string;
}
/**
* 路由解析结果
*/
exportinterfaceResolvedRoute{
/** 目标 Agent 标识 */
agentId:string;
/** 会话标识 */
sessionKey:string;
/** 主会话标识(用于合并私信) */
mainSessionKey?:string;
/** 匹配类型(用于调试) */
matchType:string;
/** 指定模型(可选) */
model?:string;
/** 指定提供商(可选) */
provider?:string;
/** 匹配到的绑定规则 */
binding: Binding;
}
/**
* 解析 Agent 路由
* 实现七级优先级匹配策略
*/
exportasyncfunctionresolveAgentRoute(
options: ResolveAgentRouteOptions
):Promise<ResolvedRoute>{
const{ cfg, channel, accountId, sender, guild, roles, team, text }= options;
// 获取绑定规则列表
const bindings = cfg.bindings ??[];
if(bindings.length ===0){
// 无绑定配置,使用默认 Agent
const defaultAgentId =resolveDefaultAgentId(cfg);
const sessionKey =buildAgentSessionKey({
channel,
agentId: defaultAgentId,
});
return{
agentId: defaultAgentId,
sessionKey,
matchType:"default-fallback",
binding:{ agentId: defaultAgentId }as Binding,
};
}
// 按优先级排序的匹配规则(从高到低)
const matchStrategies: MatchStrategy[]=[
// 优先级 1: peer - 直接匹配特定用户
{
name:"peer",
priority:1,
match:()=>findPeerBinding(bindings, sender),
},
// 优先级 2: peer.parent - 继承父线程的绑定
{
name:"peer.parent",
priority:2,
match:()=>findParentBinding(bindings, sender),
},
// 优先级 3: guild+roles - Discord 服务器 + 角色组合
{
name:"guild+roles",
priority:3,
match:()=>findGuildRoleBinding(bindings, guild, roles),
},
// 优先级 4: guild - Discord 服务器
{
name:"guild",
priority:4,
match:()=>findGuildBinding(bindings, guild),
},
// 优先级 5: team - Slack 团队
{
name:"team",
priority:5,
match:()=>findTeamBinding(bindings, team),
},
// 优先级 6: account - 账户级别
{
name:"account",
priority:6,
match:()=>findAccountBinding(bindings, accountId),
},
// 优先级 7: channel - 渠道级别
{
name:"channel",
priority:7,
match:()=>findChannelBinding(bindings, channel),
},
];
// 按优先级遍历匹配
for(const strategy of matchStrategies.sort((a, b)=> a.priority - b.priority)){
const binding = strategy.match();
if(binding){
if(shouldLogVerbose()){
logDebug(`Route matched: ${strategy.name}`,{ sender, binding });
}
// 构建会话标识
const sessionKey =buildAgentSessionKey({
channel,
sender,
agentId: binding.agentId,
mergePrivate: binding.mergePrivate,
});
return{
agentId: binding.agentId,
sessionKey,
mainSessionKey: binding.mergePrivate ? sessionKey :undefined,
matchType: strategy.name,
model: binding.model,
provider: binding.provider,
binding,
};
}
}
// 无匹配,使用默认绑定(最后一个优先级)
const defaultBinding =findDefaultBinding(bindings);
if(defaultBinding){
const sessionKey =buildAgentSessionKey({
channel,
agentId: defaultBinding.agentId,
});
return{
agentId: defaultBinding.agentId,
sessionKey,
matchType:"default-binding",
model: defaultBinding.model,
provider: defaultBinding.provider,
binding: defaultBinding,
};
}
// 完全无匹配,抛出错误
thrownewRouteResolutionError(
`No agent binding found for: channel=${channel}, sender=${sender}, `+
`guild=${guild}, team=${team}. Please check your configuration.`
);
}
// ========== 匹配策略实现 ==========
functionfindPeerBinding(bindings: Binding[], sender?:string): Binding |undefined{
if(!sender)returnundefined;
return bindings.find(b =>
b.peer === sender ||
(b.peerPattern &&newRegExp(b.peerPattern).test(sender))
);
}
functionfindParentBinding(bindings: Binding[], sender?:string): Binding |undefined{
// 实现父线程绑定继承逻辑
// 需要访问线程关系数据...
returnundefined;// 占位
}
functionfindGuildRoleBinding(
bindings: Binding[],
guild?:string,
roles?:string[]
): Binding |undefined{
if(!guild ||!roles || roles.length ===0)returnundefined;
// 查找匹配 guild 和任意 role 的组合
return bindings.find(b =>{
if(b.guild !== guild)returnfalse;
if(!b.roles)returnfalse;
return roles.some(r => b.roles!.includes(r));
});
}
functionfindGuildBinding(bindings: Binding[], guild?:string): Binding |undefined{
if(!guild)returnundefined;
return bindings.find(b => b.guild === guild);
}
functionfindTeamBinding(bindings: Binding[], team?:string): Binding |undefined{
if(!team)returnundefined;
return bindings.find(b => b.team === team);
}
functionfindAccountBinding(bindings: Binding[], accountId?:string): Binding |undefined{
if(!accountId)returnundefined;
return bindings.find(b =>
b.account === accountId ||
(b.accountPattern &&newRegExp(b.accountPattern).test(accountId))
);
}
functionfindChannelBinding(bindings: Binding[], channel:string): Binding |undefined{
return bindings.find(b => b.channel === channel);
}
functionfindDefaultBinding(bindings: Binding[]): Binding |undefined{
return bindings.find(b => b.default ===true)?? bindings[bindings.length -1];
}
// ========== 会话键构建 ==========
/**
* 构建 Agent 会话标识
*/
exportfunctionbuildAgentSessionKey(options:{
channel:string;
sender?:string;
agentId:string;
mergePrivate?:boolean;
}):string{
const{ channel, sender, agentId, mergePrivate }= options;
// 私信合并模式:所有私信进入同一会话
if(mergePrivate &&!sender){
return`agent:${agentId}:main`;
}
// 标准格式:包含发送者标识以实现隔离
if(sender){
// 对发送者标识进行安全编码
const encodedSender =encodeSender(sender);
return`agent:${agentId}:direct:${encodedSender}`;
}
// 无发送者:使用渠道级会话
return`agent:${agentId}:${channel}`;
}
/**
* 编码发送者标识(确保会话键安全)
*/
functionencodeSender(sender:string):string{
// 替换特殊字符,确保会话键可作为文件名
return sender
.replace(/[^a-zA-Z0-9_-]/g,"_")
.substring(0,64);// 长度限制
}
// ========== 错误类型 ==========
exportclassRouteResolutionErrorextendsError{
constructor(message:string){
super(message);
this.name ="RouteResolutionError";
}
}
// 类型定义辅助
interfaceMatchStrategy{
name:string;
priority:number;
match:()=> Binding |undefined;
}
5.1.2 resolveAgentRoute 函数:七级优先级路由匹配
resolveAgentRoute 实现了七级优先级的路由匹配策略,这是 OpenClaw 支持复杂多租户场景的核心机制:
|
|
|
|
|
|
|---|---|---|---|---|
|
|
peer |
|
|
|
|
|
peer.parent |
|
|
|
|
|
guild+roles |
|
|
|
|
|
guild |
|
|
|
|
|
team |
|
|
|
|
|
account |
|
|
|
|
|
channel |
|
|
|
|
|
default |
|
|
|
5.1.2.1 ResolveAgentRouteOptions 参数结构
|
|
|
|
|
|---|---|---|---|
cfg |
OpenClawConfig |
bindings) |
|
channel |
string |
|
"tui"
"telegram", "discord" |
accountId |
string? |
|
"+1234567890"
|
sender |
string? |
|
"user@example.com" |
guild |
string? |
|
"123456789012345678" |
roles |
string[]? |
|
["admin", "developer"] |
team |
string? |
|
"T12345678" |
text |
string |
|
"请深度解读..." |
5.1.2.2 channel、account、sender 等维度提取
在我们的 TUI 场景中,各维度的取值为:
-
channel = "tui"(固定值) -
accountId = connState.sessionKey(连接会话标识) -
sender = undefined(TUI 无特定发送者概念) -
guild = undefined,roles = undefined,team = undefined(非 Discord/Slack)
5.1.2.3 bindings 配置遍历与优先级排序
bindings 是用户在 openclaw.json 中配置的路由规则数组,典型配置:
{
"bindings":[
{"peer":"vip@company.com","agentId":"executive-assistant"},
{"guild":"123456789","roles":["admin"],"agentId":"admin-bot"},
{"channel":"telegram","agentId":"telegram-handler"},
{"channel":"tui","agentId":"main","default":true}
]
}
5.1.3 路由决策关键变量分析
5.1.3.1 agentId 确定:精确匹配 vs 默认回退
在我们的场景中,假设配置包含 { "channel": "tui", "agentId": "main", "default": true },则:
-
findChannelBinding(bindings, "tui")返回该绑定 -
agentId = "main" -
matchType = "channel"
5.1.3.2 无匹配路由时的错误处理路径
如果所有匹配策略都失败且没有默认绑定,抛出 RouteResolutionError,Gateway 返回错误响应,TUI 显示配置错误提示。
5.1.4 buildAgentSessionKey 函数:会话唯一标识生成
5.1.4.1 输入参数组合策略
|
|
|
|
|---|---|---|
channel="tui"
agentId="main", 无 sender |
agent:main:tui |
|
channel="telegram"
agentId="support", sender="user123" |
agent:support:direct:user123 |
|
mergePrivate=true
sender |
agent:{agentId}:main |
|
5.1.4.2 sessionKey 格式与解析可逆性
sessionKey 采用自描述格式:{type}:{agentId}:{scope}[:{identifier}],使得:
-
无需额外查询即可解析基本信息
-
可作为文件系统路径(安全字符编码)
-
支持嵌套和扩展
6. Agent 模块:智能体执行与响应生成
Agent 模块是 OpenClaw 的 AI 执行引擎,负责实际的 LLM 调用、工具执行和流式响应生成。
6.1 文件路径:openclaw/src/agents/pi-embedded-runner.ts
pi-embedded-runner.ts 是 Agent 执行模块的入口文件,采用重导出模式组织代码。
6.1.1 完整源代码呈现
// openclaw/src/agents/pi-embedded-runner.ts
// Agent 执行模块入口
// 核心执行函数
export{ runEmbeddedPiAgent }from"./pi-embedded-runner/run.js";
// 类型定义
exporttype{
RunEmbeddedPiAgentOptions,
EmbeddedPiRunResult,
BlockReplyPayload,
ToolCallPayload,
ToolResultPayload,
}from"./pi-embedded-runner/types.js";
// 运行队列管理
export{
queueEmbeddedPiMessage,
getRunStatus,
cancelRun,
}from"./pi-embedded-runner/runs.js";
// 事件订阅
export{ subscribeToSessionEvents }from"./pi-embedded-subscribe.js";
6.1.2 模块导出与类型定义
模块采用清晰的分层导出:核心函数、类型定义、队列管理、事件订阅分别导出,便于调用者按需导入。
6.2 文件路径:openclaw/src/agents/pi-embedded-runner/run.ts
run.ts 包含 runEmbeddedPiAgent() 主入口函数,是 Agent 执行的 orchestrator。
6.2.1 完整源代码呈现(基于官方文档重构)
// openclaw/src/agents/pi-embedded-runner/run.ts
// Agent 执行主入口
importtype{ OpenClawConfig }from"../../config/config.js";
import{ runEmbeddedAttempt }from"./run/attempt.js";
importtype{
RunEmbeddedPiAgentOptions,
EmbeddedPiRunResult,
BlockReplyPayload,
}from"./types.js";
/**
* Agent 执行选项
*/
exportinterfaceRunEmbeddedPiAgentOptions{
/** 运行唯一标识 */
runId:string;
/** 会话标识 */
sessionKey:string;
/** 会话 ID(可选) */
sessionId?:string;
/** 会话文件路径(可选) */
sessionFile?:string;
/** 工作目录(可选) */
workspaceDir?:string;
/** 系统配置 */
config: OpenClawConfig;
/** 用户提示词 */
prompt:string;
/** 模型提供商(可选) */
provider?:string;
/** 模型标识(可选) */
model?:string;
/** 超时时间(毫秒,默认 120000) */
timeoutMs?:number;
/** 取消信号 */
signal?: AbortSignal;
// 事件回调
/** 块回复回调(流式响应) */
onBlockReply?:(payload: BlockReplyPayload)=>Promise<void>;
/** 思考过程回调 */
onThinking?:(text:string)=>Promise<void>;
/** 工具调用回调 */
onToolCall?:(toolCall: ToolCallPayload)=>Promise<void>;
/** 工具结果回调 */
onToolResult?:(result: ToolResultPayload)=>Promise<void>;
}
/**
* Agent 执行结果
*/
exportinterfaceEmbeddedPiRunResult{
/** 运行状态 */
status:"completed"|"cancelled"|"error"|"timeout";
/** 最终响应文本(如成功) */
finalResponse?:string;
/** 使用的 Token 数 */
tokenUsage?:{
prompt:number;
completion:number;
total:number;
};
/** 工具调用次数 */
toolCallCount?:number;
/** 执行耗时(毫秒) */
durationMs:number;
/** 错误信息(如失败) */
error?:{
message:string;
code:string;
};
}
/**
* 运行嵌入式 Pi Agent
* 这是 Agent 执行的核心入口函数
*/
exportasyncfunctionrunEmbeddedPiAgent(
options: RunEmbeddedPiAgentOptions
):Promise<EmbeddedPiRunResult>{
const startTime = Date.now();
const{
runId,
sessionKey,
sessionId,
sessionFile,
workspaceDir,
config,
prompt,
provider,
model,
timeoutMs =120_000,
signal,
onBlockReply,
onThinking,
onToolCall,
onToolResult,
}= options;
// 创建执行上下文
const ctx: ExecutionContext ={
runId,
sessionKey,
config,
startTime,
signal,
callbacks:{
onBlockReply,
onThinking,
onToolCall,
onToolResult,
},
};
try{
// 设置超时控制
const timeoutHandle =setTimeout(()=>{
// 超时处理...
}, timeoutMs);
// 核心:执行单次尝试(可能包含多轮工具调用)
const attemptResult =awaitrunEmbeddedAttempt({
ctx,
sessionId,
sessionFile,
workspaceDir,
prompt,
provider,
model,
});
clearTimeout(timeoutHandle);
// 组装结果
return{
status: attemptResult.status,
finalResponse: attemptResult.finalResponse,
tokenUsage: attemptResult.tokenUsage,
toolCallCount: attemptResult.toolCallCount,
durationMs: Date.now()- startTime,
};
}catch(error){
// 错误分类处理
if(error.name ==="AbortError"|| signal?.aborted){
return{
status:"cancelled",
durationMs: Date.now()- startTime,
};
}
if(error.code ==="TIMEOUT"){
return{
status:"timeout",
durationMs: Date.now()- startTime,
error:{ message:"Execution timeout", code:"TIMEOUT"},
};
}
return{
status:"error",
durationMs: Date.now()- startTime,
error:{
message: error.message,
code: error.code ||"EXECUTION_ERROR",
},
};
}
}
// 执行上下文类型
interfaceExecutionContext{
runId:string;
sessionKey:string;
config: OpenClawConfig;
startTime:number;
signal?: AbortSignal;
callbacks:{
onBlockReply?:(payload: BlockReplyPayload)=>Promise<void>;
onThinking?:(text:string)=>Promise<void>;
onToolCall?:(toolCall: ToolCallPayload)=>Promise<void>;
onToolResult?:(result: ToolResultPayload)=>Promise<void>;
};
}
6.2.2 runEmbeddedPiAgent 函数:Agent 执行入口
runEmbeddedPiAgent 是 Agent 执行的 orchestrator,负责参数解析、超时控制、错误处理和结果组装。实际的 LLM 调用和工具执行委托给 runEmbeddedAttempt。
6.2.2.1 RunEmbeddedPiAgentOptions 参数解构
|
|
|
|
|
|---|---|---|---|
runId |
string |
|
|
sessionKey |
string |
|
|
sessionId |
string? |
|
|
sessionFile |
string? |
|
|
workspaceDir |
string? |
|
|
config |
OpenClawConfig |
|
|
prompt |
string |
|
|
provider |
string? |
|
|
model |
string? |
|
|
timeoutMs |
number? |
|
|
signal |
AbortSignal? |
|
|
onBlockReply |
function? |
|
|
onThinking |
function? |
|
|
onToolCall |
function? |
|
|
onToolResult |
function? |
|
|
6.2.2.2 attempt 模块与 ./run/attempt.js 依赖
runEmbeddedAttempt 位于 ./run/attempt.js,负责单次尝试的完整逻辑,包括:
-
会话状态加载和保存
-
系统提示词组装(
AGENTS.md+SOUL.md+TOOLS.md) -
LLM API 调用(支持流式响应)
-
工具调用解析和执行
-
多轮对话循环(直到获得最终响应或达到限制)
6.2.3 消息队列与状态机管理
6.2.3.1 queueEmbeddedPiMessage 异步队列
根据技术文档,OpenClaw 实现了精巧的 Lane Queue(车道队列) 机制进行并发控制:
|
|
|
|
|
|---|---|---|---|
session:<key> |
1 |
|
|
main |
|
|
|
cron |
|
|
|
subagent |
|
|
|
nested |
|
|
|
核心保证:“Only one agent run touches a given session at a time”——通过 per-session lane 实现。
6.2.3.2 会话状态持久化与恢复
会话状态以 Markdown 文件 形式持久化存储:
|
|
|
|
|---|---|---|
AGENTS.md |
|
|
SOUL.md |
|
|
TOOLS.md |
|
|
MEMORY.md |
|
|
HISTORY.md |
|
|
这种“本地优先”的设计使得:
-
用户数据完全由用户控制
-
可用 Git 进行版本控制
-
便于人工审计和调试
-
无需依赖外部数据库
6.3 推理执行引擎(基于架构文档补充)
6.3.1 模型调用链:提示词组装 → 上下文加载 → API 请求
根据 CSDN 博客的分析,LLM 调用机制的核心流程分为三个步骤:
第一步:初始化 Agent 会话
const{ session }=awaitcreateAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage: params.authStorage,
modelRegistry: params.modelRegistry,
model: params.model,
thinkingLevel:mapThinkingLevel(params.thinkLevel),
tools: builtInTools,
customTools: allCustomTools,
sessionManager,
});
第二步:组装完整提示词
-
系统提示词 =
AGENTS.md+SOUL.md+ 动态工具描述 -
上下文 = 历史消息(经 Token 限制截断)+ 语义搜索记忆
-
用户消息 = 当前输入
第三步:流式调用 LLM API
const stream =await llmClient.chat.completions.create({
model: effectiveModel,
messages: assembledMessages,
tools: availableTools,
stream:true,// 关键:启用流式响应
});
6.3.2 工具调用循环:attempt 重试机制与工具执行
Agent 执行遵循 ReAct 范式:
循环直到获得最终响应或达到限制:
1. 调用 LLM,获取响应
2. 如果响应包含 tool_calls:
a. 解析工具调用请求
b. 执行工具(通过 9 层策略评估)
c. 将结果格式化为 tool 消息
d. 追加到上下文,继续循环
3. 如果响应是最终文本:
返回结果,结束循环
6.3.3 响应流式生成与事件推送
流式响应通过回调机制实时推送:
forawait(const chunk of stream){
const delta = chunk.choices[0]?.delta;
if(delta?.content){
// 文本内容,推送 response 事件
await onBlockReply?.({
text: delta.content,
done:false,
});
}
if(delta?.tool_calls){
// 工具调用,推送 tool_call 事件
for(const toolCall of delta.tool_calls){
await onToolCall?.(parseToolCall(toolCall));
}
}
}
// 最终标记
await onBlockReply?.({ text:"", done:true});
7. 端到端执行链路:关键变量与路径分支详解
7.1 阶段一:TUI 输入处理
7.1.1 变量 value:”请深度解读OpenClaw的工作原理”
用户输入的原始文本,在 createEditorSubmitHandler 中接收。
7.1.2 分支判断:value[0] 字符检测决定处理路径
value[0] |
|
|---|---|
"!" |
executeCommand |
"/" |
executeCommand |
"请"
|
sendMessage |
7.1.3 路径结果:进入 sendMessage 普通消息分支
7.2 阶段二:消息封装与发送
7.2.1 变量 runId:UUID 生成与幂等性键赋值
const runId =randomUUID();// 例如: "550e8400-e29b-41d4-a716-446655440000"
// ...
idempotencyKey: runId,// 幂等性保证
7.2.2 变量 sessionKey:从 state 读取当前会话标识
const sessionKey = state.getSessionKey();// 例如: "agent:main:abc123"
7.2.3 变量 deliverDefault:消息投递模式配置
默认 true,表示投递到主会话。
7.2.4 RPC 调用:chat.send 方法名与参数负载
awaitthis.client.request("chat.send",{
text:"请深度解读OpenClaw的工作原理",
runId:"550e8400-e29b-41d4-a716-446655440000",
idempotencyKey:"550e8400-e29b-41d4-a716-446655440000",
sessionKey:"agent:main:abc123",
deliverDefault:true,
});
7.3 阶段三:Gateway 路由决策
7.3.1 变量 channel:固定值 "tui" 的来源
从连接状态 connState.role 推导。
7.3.2 变量 bindings:从配置加载的路由规则数组
const bindings = cfg.bindings;// 用户配置的绑定规则
7.3.3 匹配执行:七级优先级遍历与 agentId 确定
假设匹配到 { channel: "tui", agentId: "main" },则:
-
agentId = "main" -
sessionKey = "agent:main:tui" -
matchType = "channel"
7.4 阶段四:Agent 执行与响应
7.4.1 变量 model:路由结果指定的模型标识
可能为 undefined(使用默认),或具体值如 "gpt-4o"。
7.4.2 变量 context:历史消息与系统提示词组装
包含 AGENTS.md + SOUL.md + 历史消息 + 语义搜索记忆。
7.4.3 流式响应:response 事件分片与 Gateway 转发
LLM API → onBlockReply 回调 → Gateway broadcast → WebSocket → TUI onEvent
7.5 阶段五:TUI 响应渲染
7.5.1 onEvent 回调触发与消息类型判断
client.onEvent((event)=>{
switch(event.type){
case"response":handleResponse(event);break;
case"thinking":handleThinking(event);break;
// ...
}
});
7.5.2 chatLog.append 与思考状态清除
chatLog.finalizeResponse(runId, text);// 替换思考状态为实际内容
state.clearThinking(runId);
7.5.3 终端 UI 更新与光标复位
重新渲染聊天日志,将光标置于输入框。
8. 关键设计模式与工程实践总结
8.1 幂等性设计:idempotencyKey 与请求去重
OpenClaw 的幂等性设计贯穿整个链路:
-
TUI 层:
runId作为本地运行标识 -
Gateway 层:
idempotencyKey短期缓存(默认 5 分钟) -
Agent 层:幂等性保证由底层基础设施提供
8.2 取消令牌:AbortController 与请求生命周期管理
从 TUI 到 Gateway 到 Agent,取消信号逐级传递,实现端到端取消。
8.3 状态隔离:state 对象与会话上下文封装
每个组件维护独立的状态,通过明确的接口交互,避免全局状态污染。
8.4 事件驱动:WebSocket 双向实时通信模型
请求-响应与事件推送分离,支持流式响应和服务器主动推送。
8.5 插件化架构:Channel 与 Agent 的可扩展注册
35+ 消息渠道通过统一接口接入,Agent 能力通过 Skills 动态扩展。
9. 附录:源码文件完整路径索引
9.1 TUI 模块源码
|
|
|
|---|---|
openclaw/src/tui/tui.ts |
|
openclaw/src/tui/tui-command-handlers.ts |
|
openclaw/src/tui/gateway-chat.ts |
|
openclaw/src/tui/gateway-client.ts |
|
openclaw/src/tui/chat-log.ts |
|
openclaw/src/tui/tui-state.ts |
|
9.2 Gateway 模块源码
|
|
|
|---|---|
openclaw/src/gateway/server.ts |
|
openclaw/src/gateway/server.impl.ts |
|
openclaw/src/gateway/server-methods.ts |
|
openclaw/src/gateway/server-chat.ts |
|
openclaw/src/gateway/server-channels.ts |
|
openclaw/src/gateway/server-events.ts |
|
openclaw/src/gateway/session-manager.ts |
|
9.3 Routing 模块源码
|
|
|
|---|---|
openclaw/src/routing/resolve-route.ts |
|
openclaw/src/routing/session-key.ts |
|
openclaw/src/routing/bindings.ts |
|
9.4 Agent 模块源码
|
|
|
|---|---|
openclaw/src/agents/pi-embedded-runner.ts |
|
openclaw/src/agents/pi-embedded-runner/run.ts |
runEmbeddedPiAgent
|
openclaw/src/agents/pi-embedded-runner/run/attempt.ts |
|
openclaw/src/agents/pi-embedded-runner/runs.ts |
|
openclaw/src/agents/pi-embedded-subscribe.ts |
|
openclaw/src/agents/pi-tools.ts |
|
openclaw/src/agents/model-selection.ts |
|
openclaw/src/agents/system-prompt.ts |
|
夜雨聆风
