【OpenClaw】通过Nanobot源码学习架构---(1)总体
0x00 概要 0x01 OpenClaw 基础 1.1 Harness 1.2 OpenClaw 1.3 OpenClaw 架构 1.4 关键组件 0x02 Nanobot 基础 2.1 核心功能 / 特色 2.2 🛠️ 技术栈 2.3 📁 主要目录结构 2.4🌐 支持的平台 0x03 Nanobot 总体架构 3.1 架构特点 3.2 架构图 0x04 Nanobot 消息分发机制详解 4.1 消息处理流程图 4.2 消息流入和流出完整流程 4.3 消息如何分发给不同 Session 4.4 消息如何从 Channel 发送到其他模块 0xFF 参考
0x00 概要
OpenClaw 应该有几十万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。
Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。其核心定位如下,非常适合学习Agent架构:
超轻量:核心代码仅约 4,000 行,比 Clawdbot 的 43 万行代码少 99% 设计哲学:微内核架构 + 极致可读性 功能定位:个人AI助手,支持多平台接入 研究友好:代码清晰易读,易于理解、修改和扩展 快速启动:最小化占用意味着更快的启动速度和更低的资源消耗 即开即用:一键部署即可使用
注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。
0x01 OpenClaw 基础
我们首先来看看 OpenClaw 的基础概念,能让我们在后续利用 Nanobot 学习更加顺利。
OpenClaw 是 Harness,是面向个人与本地场景的、开箱即用的 Agent Harness 框架。它不生产模型,而是把模型 “套上马具”,让模型能稳定、安全、自主地在本地执行真实任务。或者说,OpenClaw 是 Agent 中"不是 AI 的部分",而 Agent 的实际"聪明程度"完全取决于背后接入的语言模型。
1.1 Harness
智能体 = 模型 + 控制壳(Harness)。
Harness 是包裹在 LLM 之外、负责让 Agent 稳定、可控、可落地执行任务的全套基础设施层,模型提供智能,控制壳让智能变得可用。或者说,Harness 是 Agent 在特定领域工作所需要的一切:Harness = (推理·上下文·记忆·状态) + (工具·编排·闭环) + Knowledge + Observation + Action Interfaces + Permissions,即:智能管理层 + 执行调度层 + 领域知识层 + 反馈观测层 + 安全权限层:
Tools: 文件读写、Shell、网络、数据库、浏览器 Knowledge: 产品文档、领域资料、API 规范、风格指南 Observation: git diff、错误日志、浏览器状态、传感器数据 Action: CLI 命令、API 调用、UI 交互 Permissions: 沙箱隔离、审批流程、信任边界、安全护栏、权限控制、错误处理
1.2 OpenClaw
OpenClaw 的架构与能力完全符合 Harness 的定义,是 Harness 在个人场景的落地:
OpenClaw 是 Agent 中"不是 AI 的部分"。它负责记忆管理、任务调度、工具执行、信道路由,而 Agent 的实际"聪明程度"完全取决于背后接入的LLM。 模型做决策。OpenClaw 执行。模型做推理。OpenClaw 提供上下文。模型是驾驶者。OpenClaw 是载具。
| 模型无关的执行层 | |
| 任务编排与执行闭环 | |
| 工具 / 系统调用管控 | |
| 上下文与记忆管理 | |
| 安全与护栏 | |
| 可扩展生态 |
1.3 OpenClaw 架构
OpenClaw 的架构可以概括为一个以Gateway(网关)为核心的控制平面的分布式系统,OpenClaw 的核心不是模型,而是网关(Gateway)。
OpenClaw 本质上是一个围绕集中式控制平面构建的、事件驱动的、会话隔离的单写入状态机,其整体架构是以网关为中心的星型拓扑:
Gateway 是流量调度器、唯一事实来源和控制平面,负责接收来自四面八方的各类事件并进行处理路由、排队、状态管理,然后才去调用 LLM。 智能体运行时(agent runtime)是负责“思考与执行”的工作单元,能够执行“轮次操作”:调用大语言模型、使用工具、写入状态,并回复。
下面两个图可以展示其架构。
1.3.1 OpenClaw 精简架构 1

1.3.2 OpenClaw 精简架构 2

1.4 关键组件
OpenClaw 的关键组件如下:
Channels(频道 / 用户接入层):
OpenClaw 不构建自己的 UI,而是把现有通讯渠道(WhatsApp、Telegram、Slack、Discord)作为交互层。每个渠道有不同的能力和约束,Agent 的行为要适应渠道特性。 OpenClaw 把每个平台(Telegram/Discord/Signal 等)都抽象成同一套 ChannelPlugin适配器合同。核心系统不需要知道“某个平台的私有细节”,只要按统一接口调用即可。账号生命周期由 ChannelManager 统一管理。Channels 负责多渠道统一接入,消息格式转换,核心功能是:监听各渠道消息,统一消息格式,用户身份识别,消息路由分发。消息从各个Channel进入时不会被同步阻塞,而是通过回调把事件推送进系统。Channel接入层只负责把“新消息、连接变化、错误”等信号变成统一的异步事件,再交给后续处理链路,从源头保证高并发和低耦合。 Channels是OpenClaw进行社交生态连接最重要的设计,它将AI能力真正注入到了用户的社交与工作动线中。这对产品设计的影响是深远的:AI 功能的入口,将越来越多地迁移到用户已经存在的工作流里,而不是要求用户打开一个新应用。 "让 AI 功能在用户已在的地方运作"将成为设计决策的起点,而不是"让用户来找 AI"。 Gateway(控制平面 / 信息调度中心):
OpenClaw 运行着一个持续在线的网关守护进程,负责维持所有连接并协调整个系统,这是坐在用户指令和模型调用之间的控制层。 OpenClaw 能够支持多种界面(CLI、Web UI、桌面应用、移动节点)的一个重要原因是,它将网关视为一个真正的控制平面。Gateway 连接各种聊天平台和控制界面,把收到的消息派发给 Agent 运行器处理。 Gateway 关键设计思想是: 把消息通信、接口层和AI 怎么思考和执行(Agent)彻底分开。它采用调度中心架构,所有消息都经过一个中央塔台进行分发。Gateway 是"总机" → Agent 是"接线员+执行者"。 Gateway 关注: 谁发来的?发到哪? Agent 关注: 什么意思?怎么做?做完回什么? Gateway 核心就是一个HTTP和WebSocket服务。其启动时与注册的Channel(比如Telegram机器人)建立WebSocket连接,随时准备接收消息。 Gateway 是持久运行的控制平面,负责保持与所有消息渠道的长连接、所有组件的调度与交互,管理会话状态、响应客户端请求、处理定时任务,以及Agent调度。同时还要负责监控各Channel和Node的联通状况(health check)。 智能体运行时(agent runtime / 思考核心):
智能体运行时具体负责:多模型统一接口,工具调用和执行,技能系统管理,会话上下文维护,记忆系统(短期+长期); 一旦网关决定了由哪个 agent 和哪个会话来处理输入,智能体运行时会执行这样一个常规循环:1)加载上下文(会话历史 + 工作区上下文);2)调用模型;3)执行工具调用(浏览器、文件系统、shell、节点、插件);4)持久化更新;5)响应(或故意保持沉默)。 OpenClaw并没有从0构造Agent核心,而是使用开源的Pi-Agent框架。Pi Agent构成整个系统执行的大脑思考核心,是处理逻辑和生成回复的核心引擎。系统中所有的运行逻辑都由推理循环架构来控制,也就是AgenticLoop。 AgentLoop 是 OpenClaw 最关键的执行循环。每次收到用户消息,都会进入这个循环。OpenClaw的推理循环是一个事件驱动的架构:主循环 ( run.ts) 负责错误处理、重试、profile轮换;尝试层 (attempt.ts) 负责单次LLM调用的完整生命周期;事件订阅 (subscribe.ts) 处理流式响应和工具调用。Nodes & Apps:
通过将不同设备定义为“节点”,OpenClaw 实现了跨设备的硬件控制; Channel是基于不同IM的开放平台能力,让OpenClaw与不同的IM的开放平台服务通信。可以说Channel是针对不同的IM程序的适配器。相对的,Node则是针对iOS、Android、macOS这种操作系统的适配器。 Node是一种实际运行在iOS、Android和macOS上的程序,并与运行在用户电脑上的OpenClaw主程序进行远程通信。用户将自己的设备的权限开放给Node,这样OpenClaw就可以通过Node来远程控制用户的设备,如执行任务,打开摄像头,屏幕截图,获取地理位置等。
0x02 Nanobot 基础
Nanobot遵循两条核心原则:
"当有疑问时,留白" :拒绝功能膨胀、保持核心精简、按需扩展 "代理逻辑不应埋藏在层层抽象之下":代码可读性优先、单一职责原则、模块边界清晰
因此,Nanobot的核心特色如下:
2.1 核心功能 / 特色
轻量级架构:无冗余设计,仅保留 Agent 核心能力。 异步非阻塞:全异步设计,支持并发任务处理,避免单任务阻塞整个引擎 多通道适配:支持 CLI、系统消息、自定义通道等多场景消息处理 消息驱动的 Agent 主循环:接收消息 → 构建上下文 → 调用 LLM → 执行工具 → 返回结果 完整的工具生态:支持文件操作、命令执行、网页搜索 / 爬取、子 Agent 生成、定时任务、消息发送等核心工具 可扩展:插件化工具注册机制,支持自定义工具 / MCP 扩展,适配不同场景 会话管理与记忆 Consolidation:自动管理会话历史,自动 Consolidate 长会话记忆,支持大窗口记忆压缩,避免上下文溢出,平衡上下文长度与记忆完整性 安全隔离:可限制 Agent 操作范围到指定工作区,防止越权访问 可中断性:支持 /stop 指令终止当前任务,保证 Agent 响应性
2.2 🛠️ 技术栈
以下是 Nanobot 的技术栈。
| 核心语言 | |
| CLI 工具 | |
| LLM 接入 | |
| 配置管理 | |
| 日志 | |
| WebSocket | |
| HTTP 客户端 | |
| OAuth | |
| Telegram | |
| Discord | |
| 飞书 | |
| 钉钉 | |
| Slack | |
| QQ 机器人 | |
| 定时任务 | |
| WhatsApp Bridge | |
| MCP 支持 | |
| Markdown 渲染 | |
| 代码规范 |
2.3 📁 主要目录结构
以下是 Nanobot 的主要目录结构。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linenanobot-main/├── nanobot/ # 核心包目录│ ├── agent/ # 🧠 核心代理逻辑│ │ ├── loop.py # 代理循环 (LLM ↔ 工具执行)│ │ ├── context.py # 提示构建器│ │ ├── memory.py # 持久化记忆│ │ ├── skills.py # 技能加载器│ │ ├── subagent.py # 后台任务执行│ │ └── tools/ # 内置工具│ ├── skills/ # 🎯 捆绑技能 (github, weather, tmux...)│ │ ├── clawhub/│ │ ├── cron/│ │ ├── github/│ │ ├── memory/│ │ ├── skill-creator/│ │ ├── summarize/│ │ ├── tmux/│ │ └── weather/│ ├── channels/ # 📱 聊天渠道集成│ ├── bus/ # 🚌 消息路由│ ├── cron/ # ⏰ 定时任务│ ├── heartbeat/ # 💓 主动唤醒│ ├── providers/ # 🤖 LLM 提供商配置│ ├── session/ # 💬 会话管理│ ├── config/ # ⚙️ 配置处理│ └── cli/ # 🖥️ 命令行界面├── bridge/ # Node.js WhatsApp 桥接器│ ├── package.json│ └── tsconfig.json├── tests/ # 测试目录├── case/ # 案例展示 (GIF)├── pyproject.toml # Python 项目配置├── Dockerfile # Docker 镜像配置├── docker-compose.yml # Docker Compose 配置├── README.md # 项目文档└── SECURITY.md # 安全文档
2.4🌐 支持的平台
以下是 Nanobot 支持的平台。
聊天渠道:Telegram、Discord、WhatsApp、飞书、Mochat、钉钉、Slack、Email、QQ、Matrix
LLM 提供商:OpenRouter、Anthropic (Claude)、OpenAI (GPT)、DeepSeek、Groq、Gemini、MiniMax、AiHubMix、SiliconFlow、VolcEngine、通义千问 (Dashscope)、Moonshot (Kimi)、智谱 (Zhipu)、vLLM、OpenAI Codex、GitHub Copilot
0x03 Nanobot 总体架构
3.1 架构特点
Nanobot 的架构特点如下:
消息驱动架构:通过 MessageBus 实现渠道与 Agent 的解耦 核心引擎:AgentLoop是核心处理引擎,负责 LLM 与工具执行的循环 多渠道支持:通过 ChannelManager 统一管理 10+ 种聊天平台 可扩展性: 工具通过 ToolRegistry 注册 LLM 提供商通过 Provider Registry 统一管理 支持技能插件系统和 MCP 协议 持久化:SessionManager和 MemoryStore 负责会话和记忆的持久化 后台任务:CronService 和 Heartbeat提供定时任务和主动唤醒功能
3.2 架构图
Nanobot 的架构图如下:

对应各组件职责为:
Gateway: 系统入口,协调各组件启动和运行 启动MessageBus、AgentLoop、ChannelManager 协调 CronService 和 HeartbeatService Channel(如QQchannel): 接收外部消息 将消息发布到.MessageBus 发送响应消息回外部平台 AgentLoop: 从MessageBus消费消息 执行 AI 推理和工具调用 将响应发布到 MessageBu MessageBus: 解耦 Channel 和 AgentLoop 提供异步消息队列机制
对应具体代码逻辑为

0x04 Nanobot 消息分发机制详解
我们梳理下Nanobot 消息分发机制,后续文章会针对各个环节进行解析学习。
4.1 消息处理流程图
nanobot 采用 异步消息总线 架构,实现消息的解耦分发:
MessageBus:异步队列,连接渠道和 Agent InboundQueue:入站消息(渠道 → Agent) OutboundQueue:出站消息(Agent → 渠道) SessionKey:会话标识符,用于区分不同用户/会话
nanobot 的消息处理流程图如下:
Gateway启动所有服务:启动 AgentLoop、所有 Channels、CronService 和 HeartbeatService
协调组件间通信:通过MessageBus 实现各组件间的解耦 维护整体状态:管理整个系统生命周期 Channels 与具体平台(QQ、Telegram等)对接,将消息标准化后发送到MessageBus
MessageBus 解耦Channels和Agent,实现消息传递
AgentLoop 统一处理来自所有渠道的消息,执行核心逻辑

4.2 消息流入和流出完整流程
以 QQ 用户发送消息为例的完整流程如下:
4.2.1 用户消息入站阶段
当 QQ 用户向 nanobot 发送消息(如"帮我分析这段代码")时,消息首先被 QQ 平台的服务器接收,然后通过 WebSocket 连接传递给 nanobot 的 QQ 机器人实例。
QQChannel 类通过继承 botpy.Client 并实现事件处理方法来接收消息:
on_c2c_message_create()- 处理 C2C(用户对机器人)消息on_direct_message_create()- 处理直接消息
当这两个事件被触发时,QQChannel 会调用内部方法 _on_message()。这个方法:
首先,进行消息去重处理,使用一个最大长度为 1000 的双端队列
_processed_ids来记录已处理的消息 ID,避免重复处理相同消息。然后提取用户信息,包括发送者 ID(author.id或author.user_openid)和消息内容。如果内容为空则直接返回。接着,QQChannel 调用基类 BaseChannel 的
_handle_message()方法进行权限检查和。这个方法首先调用is_allowed(sender_id)检查用户是否在白名单中。白名单通过配置文件的allowFrom字段设置,如果未配置白名单则允许所有用户访问。如果用户在白名单外,会记录警告日志并返回,拒绝处理此消息。
4.2.2 构建入站消息对象
通过权限检查后,BaseChannel 会构建一个 InboundMessage 数据类对象:
channel: 渠道名称,如 "qq"sender_id: 发送者 ID,如 "123456789"chat_id: 聊天 ID,QQ 私聊时等于 sender_idcontent: 消息文本内容timestamp: 消息时间戳(自动生成)media: 媒体文件列表(如图片 URL),默认为空metadata: 渠道特定元数据,如 QQ 的 message_idsession_key_override: 会话键覆盖,用于线程作用域会话
InboundMessage 有一个 session_key 属性,自动生成会话键:如果设置了 session_key_override 则使用它,否则使用 f"{channel}:{chat_id}" 格式。这样 QQ 用户的会话键就是 "qq:123456789"。
4.2.3 发布到消息总线
BaseChannel 调用 await self.bus.publish_inbound(msg) 将入站消息发布到消息总线。
MessageBus 维护两个异步队列:
inbound: asyncio.Queue[InboundMessage]- 入站消息队列(渠道 → Agent)outbound: asyncio.Queue[OutboundMessage]- 出站消息队列(Agent → 渠道)
publish_inbound() 方法使用 await self.inbound.put(msg) 将消息放入入站队列。这是一个非阻塞操作,如果队列满了会自动等待。
4.2.4 AgentLoop 消费入站消息
AgentLoop 的 run() 方法是主循环,持续从消息总线消费入站消息:
while self._running:try:msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)except asyncio.TimeoutError:continue
consume_inbound() 使用 await self.inbound.get() 从队列获取消息,这是一个阻塞操作,会等待直到有消息可用。这里设置了 1 秒超时,用于定期检查 _running 状态以便优雅停止。
获取到消息后,AgentLoop 会检查是否是特殊命令 /stop,如果是则调用 _handle_stop(msg) 取消该会话的所有活跃任务和子代理。否则,创建一个异步任务来处理这条消息:
task = asyncio.create_task(self._dispatch(msg))self._active_tasks.setdefault(msg.session_key, []).append(task)
AgentLoop 将任务添加到 _active_tasks 字典中,键是 session_key,值是该会话的任务列表。这样 /stop 命令可以取消特定会话的所有任务。
4.2.5 消息分发处理
_dispatch(msg) 方法是消息分发的核心,它使用全局处理锁 _processing_lock 确保消息串行化处理,避免并发问题:
async with self._processing_lock:try:response = await self._process_message(msg)if response is not None:await self.bus.publish_outbound(response)
_process_message(msg) 是完整的 Agent 处理流程,包括获取或创建会话、构建上下文、运行 Agent 迭代循环(LLM 与工具交互)、保存会话等。最终返回一个 OutboundMessage 对象。
4.2.6 发布出站消息
_process_message() 返回的 OutboundMessage 包含:
channel: 目标渠道名,如 "qq"chat_id: 目标聊天 ID,如 "123456789"content: Agent 的响应文本reply_to: 可选的回复消息 IDmedia: 可选的媒体文件列表metadata: 可选的元数据,如进度标记
AgentLoop 调用 await self.bus.publish_outbound(response) 将响应发布到出站队列。publish_outbound() 使用 await self.outbound.put(msg) 将消息放入出站队列。
4.2.7 ChannelManager 分发出站消息
ChannelManager 运行一个独立的协程 _dispatch_outbound() 来分发出站消息:
while True:try:msg = await asyncio.wait_for(self.bus.consume_outbound(), timeout=1.0)# 过滤进度消息(根据配置)if msg.metadata.get("_progress"):if msg.metadata.get("_tool_hint") and not self.config.channels.send_tool_hints:continueif not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress:continue# 获取目标渠道channel = self.channels.get(msg.channel)if channel:await channel.send(msg)
这个循环持续从出站队列消费消息,支持根据配置过滤工具提示和进度消息。然后通过 self.channels.get(msg.channel) 获取目标渠道实例。channels 是一个字典,存储了所有启用的渠道,如 {"qq": QQChannel实例, "telegram": TelegramChannel实例}。
4.2.8 渠道发送消息到用户
获取到目标渠道实例后,调用其 send(msg) 方法。对于 QQChannel 具体如下:
async def send(self, msg: OutboundMessage) -> None:if not self._client:returntry:await self._client.api.post_c2c_message(openid=msg.chat_id,msg_type=0,content=msg.content,)except Exception as e:logger.error("Error sending QQ message: {}", e)
QQChannel 使用 botpy SDK 的 API post_c2c_message() 发送 C2C 私聊消息,msg_type=0 表示文本消息。
最终,QQ 用户在客户端收到 nanobot 的响应消息,完成了完整的消息流入和流出流程。
4.3 消息如何分发给不同 Session
Session 用以区分不同用户/会话的机制:
session_key = "{channel}:{chat_id}" - 唯一标识会话 SessionManager 用字典缓存 Session 对象 ({session_key: Session}) 每个 Session 独立存储到 session_key.jsonl 文件 不同会话的消息历史完全隔离, 互不影响 /stop 命令只取消当前 session_key 的任务
4.3.1 SessionKey 生成机制
nanobot 通过 session_key 来区分不同的会话。每个 InboundMessage 都有会话键属性:
@propertydef session_key(self) -> str:return self.session_key_override or f"{self.channel}:{self.chat_id}"
默认情况下,会话键使用 {渠道名}:{聊天ID} 格式生成:
Telegram 用户: "telegram:123456789"QQ 用户: "qq:987654321"Discord 群组: "discord:456789123"CLI 会话: "cli:direct"
如果设置了 session_key_override,则使用覆盖值。这用于特殊场景,如线程作用域会话、系统任务专用会话等。
4.3.2 SessionManager 持久化机制
SessionManager 负责会话的持久化管理:
def __init__(self, workspace: Path):self.workspace = workspaceself.sessions_dir = ensure_dir(self.workspace / "sessions")self._cache: dict[str, Session] = {}
_cache 是内存缓存字典,键是 session_key,值是 Session 对象。这避免频繁读取磁盘。
会话文件存储在 workspace/sessions/ 目录,每个会话一个 JSONL 文件,文件名是安全的会话键(将 : 替换为 _)。例如:telegram_123456789.jsonl。
4.3.3 Session 对象结构
Session 数据类存储会话的所有消息:
@dataclassclass Session:key: str # 会话键messages: list[dict[str, Any]] # 消息列表created_at: datetime # 创建时间updated_at: datetime # 更新时间metadata: dict[str, Any] # 元数据last_consolidated: int # 已归档的消息数
messages 是 append-only 的消息列表,存储完整的对话历史。last_consolidated 字段记录已经归档到 MEMORY.md 的消息数量,get_history() 方法只返回 messages[last_consolidated:] 的未归档消息,这样 LLM 不会看到重复的历史。
4.3.4 获取或创建会话流程
当 AgentLoop 处理消息时,调用 session = sessions.get_or_create(msg.session_key):
def get_or_create(self, key: str) -> Session:if key in self._cache:return self._cache[key]session = self._load(key)if session is None:session = Session(key=key)self._cache[key] = sessionreturn session
首先检查内存缓存,如果存在直接返回。否则尝试从磁盘加载会话文件,如果文件不存在或加载失败则创建新会话。最后将会话加入缓存并返回。
4.3.5 会话完全隔离机制
不同用户/会话的消息通过 session_key 完全隔离:
不同 session_key 映射到不同的 Session 对象 每个 Session 独立存储到 {safe_key}.jsonl文件Session.get_history() 只返回该会话的消息历史 AgentLoop 处理每条消息时使用对应的 Session 子代理可继承主会话键或使用独立会话键
这种设计确保用户 A 的对话历史不会影响用户 B 的会话,Telegram 群组的消息不会泄露给 Discord 私聊。
4.3.6 /stop 命令的会话级取消
/stop 命令只取消特定 session_key 的任务:
async def _handle_stop(self, msg: InboundMessage) -> None:tasks = self._active_tasks.pop(msg.session_key, [])cancelled = sum(1 for t in tasks if not t.done() and t.cancel())sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
这确保停止一个会话不会影响其他会话的运行任务。
4.4 消息如何从 Channel 发送到其他模块
4.4.1 CronService 消息发送机制
CronService 是定时任务服务,在 Gateway 启动时通过回调与消息系统连接:
async def on_cron_job(job: CronJob) -> str | None:channel, chat_id = _pick_cron_target(job)if job.payload.deliver:await bus.publish_inbound(InboundMessage(channel="system", # 使用 system 标识,不是具体渠道sender_id="cron",chat_id=f"{channel}:{chat_id}", # 目标渠道和 ID(特殊格式)content=job.payload.message,session_key_override=f"cron{job.id}", # 专用会话键))return "Job delivered"else:# 不发送到渠道,直接调用 Agent 处理return await agent.process_direct(content=job.payload.message,session_key=f"cron{job.id}",)
当定时任务到期时,CronService 调用 Gateway 设置的 on_job 回调。如果任务配置了 deliver=true,回调通过 MessageBus 发布一个 system 消息,这个消息的特点是:
channel="system":标识这是系统消息,不是直接来自聊天平台sender_id="cron":标识消息来源是 cron 定时任务chat_id=f"{channel}:{chat_id}":特殊格式,包含实际的目标渠道和聊天 IDsession_key_override=f"cron{job.id}":使用专用会话键
这个消息进入 inbound 队列后被 AgentLoop 接收。
4.4.2 AgentLoop 处理系统消息
AgentLoop 的 _process_message() 方法会识别并处理 system 消息:
if msg.channel == "system":if ":" in msg.chat_id:target_channel, target_chat_id = msg.chat_id.split(":", 1)else:target_channel = msg.channeltarget_chat_id = msg.chat_id
对于 system 消息,从 chat_id 解析出实际的目标渠道和聊天 ID。例如,chat_id="telegram:123456789" 会被解析为 target_channel="telegram" 和 target_chat_id="123456789"。
AgentLoop 使用解析的目标渠道和聊天 ID 构建出站消息,确保定时任务的响应发送到正确的用户。
4.4.3 HeartbeatService 消息发送机制
HeartbeatService 是心跳服务,用于定期检查和执行 HEARTBEAT.md 中的任务。它有两个回调:
on_execute 回调:执行心跳任务
async def on_heartbeat_execute(tasks: str) -> str:channel, chat_id = _pick_heartbeat_target()return await agent.process_direct(content=tasks,session_key="heartbeat",channel=channel,chat_id=chat_id,on_progress=_silent,)
这个回调直接调用 agent.process_direct(),不经过 MessageBus。process_direct() 内部处理消息但不自动发送结果到渠道。
on_notify 回调:通知用户心跳任务结果
async def on_heartbeat_notify(response: str) -> None:channel, chat_id = _pick_heartbeat_target()if channel == "cli":return # CLI 模式无法发送await bus.publish_outbound(OutboundMessage(channel=channel,chat_id=chat_id,content=response,))
这个回调在任务完成后通过 MessageBus 发布出站消息,ChannelManager 会将其分发给目标渠道。
4.4.4 MessageTool 跨渠道消息发送
MessageTool 允许 LLM 主动向任意启用的渠道发送消息:
async def execute(self, content, channel=None, chat_id=None, **kwargs) -> str:channel = channel or self._default_channelchat_id = chat_id or self._default_chat_idmsg = OutboundMessage(channel=channel,chat_id=chat_id,content=content,)if self._send_callback:await self._send_callback(msg) # -> bus.publish_outbound(msg)
AgentLoop 在处理每条消息时设置 MessageTool 的上下文:
self.tools.message_tool.set_context(msg.channel, msg.chat_id)
这样 LLM 可以在响应 Telegram 用户时主动发送消息到 Discord 群组,实现跨渠道通知。
4.4.5 模块间消息分发总结
CronService:通过 on_job回调 → MessageBus.publish_inbound() → AgentLoop 处理 → MessageBus.publish_outbound() → ChannelManager 分发HeartbeatService:通过 on_execute回调 → AgentLoop.process_direct()(内部处理),然后通过on_notify回调 → MessageBus.publish_outbound() → ChannelManager 分发MessageTool:LLM 调用工具 → MessageTool.execute() → _send_callback(msg)→ MessageBus.publish_outbound() → ChannelManager 分发到指定渠道
所有模块间的通信都通过 MessageBus 的异步队列实现,确保了系统的解耦和可扩展性。
0xFF 参考
3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析
OpenClaw真完整解说:架构与智能体内核
https://github.com/shareAI-lab/learn-claude-code
OpenClaw架构-Agent Runtime 运行时深度拆解
夜雨聆风