乐于分享
好东西不私藏

01. nanobot 源码解读:IM 接入

01. nanobot 源码解读:IM 接入

01. nanobot 源码解读:IM 接入

文档内容基于 HKUDS/nanobot: “🐈 nanobot: The Ultra-Lightweight Personal AI Agent” 的main分支4dac0a8提交进行说明


目录

  • • 01. nanobot 源码解读:IM 接入
    • • 目录
    • • 消息数据模型
      • • InboundMessage / OutboundMessage
      • • MessageBus
    • • 消息收发机制
      • • BaseChannel 抽象设计
        • • 消息接收接口
        • • 消息发送接口
        • • 完整消息流程
        • • QQ 平台实现示例
          • • 消息监听
          • • 消息回调处理
          • • 消息发送
      • • Channel 的接入机制
        • • 源码接入
        • • 外部接入
    • • 启动流程
    • • 设计亮点
      • • 统一抽象与解耦设计
      • • 策略模式的应用

nanobot 是一个超轻量级个人 AI Agent 框架。与 openclaw 及其它 类 Claw 应用类似,nanobot 基于 Agent Loop 的消息处理能力,进一步接入了多个 IM 平台 的消息收发功能,从而构建了一个可交互的智能助手系统。

作为系列文档的第一篇,本文将解读 nanobot 的 IM 接入能力。Agent Loop 等内容将在后续章节中进行解读。

消息数据模型

InboundMessage / OutboundMessage

nanobot 定义了 InboundMessagenanobot/bus/events.py)和 OutboundMessagenanobot/bus/events.py)两个核心数据类,用于统一表示消息的接收和发送。

  • • InboundMessage:从 IM 平台接收的消息都会转换为 InboundMessage 格式,Agent Loop 处理的也是 InboundMessage
  • • OutboundMessage:Agent Loop 处理后的输出是 OutboundMessage,实际发送给 IM 平台的消息也是通过 OutboundMessage 转换而来

InboundMessage 必须能够表示 这是来自哪个 IM 平台的哪个用户在什么场景下发送的什么消息。以 QQ 为例:是私聊时用户 A 发送的消息,还是群聊时群 X 中的用户 Y 发送的消息?nanobot 的 InboundMessage 使用了如下字段来定义:

  • • channel: str:消息来自哪个 IM 平台(如 “qq”)
  • • sender_id: str:消息是哪个用户发送的
  • • chat_id: str:消息是在什么场景下发送的(私聊时为用户 ID,群聊时为群 ID)
  • • content: str:消息内容是什么
  • • metadata: dict:其他元数据(如消息 ID、附件信息等)
@dataclassclass InboundMessage:    """Message received from a chat channel."""    channel: str    sender_id: str    chat_id: str    content: str    metadata: dict    # 其他字段省略

同样地,OutboundMessage 也必须能够表示 这个消息需要发送给哪个 IM 平台的哪个场景下的哪个用户。nanobot 使用如下字段来定义:

  • • channel: str:消息通过哪个 IM 平台发送(如 “qq”)
  • • chat_id: str:消息需要在什么场景下发送(私聊/群聊/其他)
  • • content: str:发送的消息内容是什么
  • • reply_to: str | None:消息需要发送给哪个用户,此处允许为 None。群聊或类似场景需要指定用户,私聊或类似场景则不需要
  • • metadata: dict:其他元数据(如回复的消息 ID、附件信息等)
@dataclassclass OutboundMessage:    """Message to send to a chat channel."""    channel: str    chat_id: str    content: str    reply_to: str | None = None    metadata: dict    # 其他字段省略

MessageBus

nanobot 定义了 MessageBusnanobot/bus/queue.py),即 InboundMessage queue + OutboundMessage queue,用于解耦消息的收发和消息的处理。后续所有对 InboundMessage / OutboundMessage 消息的生产和消费操作都通过 MessageBus

class MessageBus:    """    Async message bus that decouples chat channels from the agent core.    Channels push messages to the inbound queue, and the agent processes    them and pushes responses to the outbound queue.    """    def __init__(self):        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()    # 其他方法省略

消息收发机制

本节重点介绍消息的收发机制,暂不涉及 Agent Loop 对消息的处理逻辑(后续章节将详细解读)。

消息收发机制主要解决两个核心问题:

  • • 消息接收:如何将来自不同 IM 平台的消息统一为 InboundMessage 格式
  • • 消息发送:如何将统一格式的 OutboundMessage 发送到不同的 IM 平台

不同的 IM 平台(QQ、微信、Telegram 等)提供了不同的 API 接口和消息格式。例如,QQ 使用 botpy 库,Telegram 使用 python-telegram-bot 库,每个库的接口和消息结构都不相同。如果不进行统一抽象,Agent Loop 就需要针对每个平台编写不同的处理逻辑,这将导致代码重复、难以维护。

BaseChannel 抽象设计

从设计模式角度来看,nanobot 使用了策略模式来处理不同 IM 平台的消息收发逻辑。策略模式允许在运行时选择不同的算法实现,这里即为不同的 IM 平台适配策略。

策略模式属于行为型设计模式,它定义了一系列算法(策略),将每个算法封装起来,并使它们可以互相替换。策略模式让算法独立于使用它的客户端而变化。在 nanobot 中,每个 IM 平台就是一个策略,BaseChannel 是策略接口,具体平台实现(如 QQChannelWeChatChannel)是具体策略。

BaseChannel 基类定义了消息收发的核心接口,提供以下能力:

  • • 消息接收:将从 IM 平台接收的消息转换为 InboundMessage
  • • 消息发送:将 OutboundMessage 转换成符合 IM 平台格式的消息发送

消息接收接口

针对消息接收,nanobot 在基类 BaseChannelnanobot/channels/base.py)中定义了如下方法:

  • • start:启动消息监听。当 IM 平台有新消息到达时,nanobot 能够感知并获取该消息。
  • • _handle_message:由于不同的 IM 平台有不同的消息格式,需要子类根据具体 IM 平台的消息格式实现 InboundMessage 的构造。基类封装了 _handle_message 方法,根据传入的参数构造 InboundMessage 实例后传入 MessageBus

消息发送接口

针对消息发送,则定义了:

  • • send:发送消息,即按照 IM 平台要求的消息格式,将 OutboundMessage 转换并发送。

完整消息流程

BaseChannel 定义的消息收发逻辑如下:启动消息监听后,当 IM 平台有新消息到达时会触发回调函数,在回调中解析消息并构造 InboundMessage,将其推送到 MessageBus 的 inbound queue。随后 Agent Loop 从 inbound queue 获取消息并处理,处理完成后生成 OutboundMessage 并推送到 MessageBus 的 outbound queue,最后由 Channel 从 outbound queue 获取消息并按 IM 平台的格式要求转换后回复。

流程说明

  1. 1. 消息接收阶段:IM 平台 => Channel 监听回调 => 解析消息 => 构造 InboundMessage => 入队列 inbound queue
  2. 2. 消息处理阶段:inbound queue 取出 => Agent Loop 处理逻辑 => 生成 OutboundMessage => 入队列 outbound queue
  3. 3. 消息发送阶段:outbound queue 取出 => Channel处理 => 转换为IM 平台格式 => 发送到 IM 平台

QQ 平台实现示例

本节以 QQ 为例,具体说明消息的收发流程。QQ 消息的收发逻辑实现在 nanobot/channels/qq.py

nanobot 选择了 botpy 库接入 QQ 平台。botpy 是一个第三方 Python 库,封装了 QQ 机器人相关的 API,提供了消息监听和发送能力,包括:

  • • 私聊消息和群聊消息的接收
  • • 文本、图片、语音等多媒体消息的支持
  • • 消息回复和转发功能
消息监听

消息监听的实现步骤如下:

  • • 创建 botpy.Client 实例,并在该实例中注册多种类型的消息回调函数
  • • 调用 botpy.Client 实例的 start 方法,开始监听 QQ 平台的消息
  • • 使用 while 循环确保在运行期间 botpy.Client 始终处于监听状态
消息回调处理

当收到消息后,回调函数的处理步骤如下:

  • • 提取 chat_id 和 sender_id。私聊场景下两者相同,群聊场景下用 chat_id 记录群聊 ID,用 sender_id 记录发送者 ID
  • • 提取 content,如果包含多媒体附件则下载到本地
  • • 提取其他必要参数作为 metadata 内容
  • • 构造 InboundMessage 实例
消息发送

回复消息的处理步骤如下:

  • • 回复 QQ 消息时需要提供 message_id,该参数存储在 metadata 中,优先获取
  • • 如果回复内容包含多媒体附件,先将多媒体内容上传
  • • 发送回复的 content

Channel 的接入机制

nanobot 中使用 Channel 表示不同的 IM 平台nanobot/channels/registry.py 文件中定义了 Channel 的接入方式,支持以下两种接入机制:

源码接入

源码接入通过扫描 nanobot/channels/ 包下的 Python 文件(排除几个特定的 .py 文件)来实现。若在某个模块中找到了 BaseChannel 的子类实现,则接入了一个名称为该模块名的 IM 平台 Channel。后续需要接入新的 Channel 时,只需要添加对应的新 .py 文件即可,扩展非常方便。

外部接入

外部接入通过获取 nanobot.channels 组下的 entry points(扩展点)来实现。

entry points 是 Python 提供的一种插件机制(通过 importlib.metadata 实现),允许第三方库通过配置的方式注册自定义实现。

在 nanobot 中,entry points 的配置格式如下:

# pyproject.toml[project.entry-points."nanobot.channels"]webhook = "nanobot_channel_webhook:WebhookChannel"

每个 key-value 对表示接入一个 Channel:

  • • key:Channel 名称(即 IM 平台标识,如 “telegram”、”slack”)
  • • value:对应的 class 实现路径(格式为 模块路径:类名

注意事项:若源码接入的 Channel 名称和外部接入的 Channel 名称重复时,优先使用源码接入的实现。

启动流程

根据前文所述,结合源码 nanobot/cli/command.py 和 nanobot/channels/manager.py,梳理出 nanobot 启动时对 Channel 的处理流程:

  1. 1. 初始化 MessageBus:创建 InboundMessage 和 OutboundMessage 队列
  2. 2. 获取 Channel 配置:读取 Channel 相关的配置,同时通过源码扫描和 entry points 获取所有支持的 Channel,根据配置确定最终生效的 Channel 列表
  3. 3. 创建 Channel 实例:对每个生效的 Channel 创建实例,传入 MessageBus
  4. 4. 启动消息监听:对所有生效的 Channel 调用 start 方法,开启监听。当 IM 平台有新消息到达时,Channel 会将其转换为 InboundMessage 并推入 MessageBus 的 inbound queue
  5. 5. 启动消息处理循环:启动异步任务,不断从 InboundMessage queue 中获取消息并交给 Agent Loop 处理,处理完成后将结果作为 OutboundMessage 推入 outbound queue(此循环由 Agent Loop 管理)
  6. 6. 启动消息发送循环:启动异步任务,不断从 OutboundMessage queue 中获取消息并交给 Channel 处理,调用 Channel 的 send 方法将消息发送给对应的 IM 平台

关键点说明

  • • 所有步骤都是异步的(async),不会相互阻塞
  • • 消息监听、处理、发送是并行的,提高系统吞吐量

设计亮点

尽管现在有AI辅助甚至主导代码编写,但是还是要对代码有一定的品鉴能力,随手列举两个亮点:

统一抽象与解耦设计

核心思想:通过定义统一的内部消息格式(InboundMessage / OutboundMessage)和消息总线(MessageBus),实现消息处理逻辑与外部 IM 平台的完全解耦。

设计优势

  • • 平台无关性:Agent Loop 只需处理统一格式的消息,无需关心消息来自哪个 IM 平台
  • • 易扩展:新增 IM 平台只需实现 BaseChannel 接口,无需修改核心处理逻辑
  • • 易测试:可以轻松模拟 MessageBus 进行单元测试,无需依赖真实 IM 平台

适用场景

  • • 系统需要对接多个外部数据源或 API
  • • 数据源格式各异但处理逻辑相似
  • • 需要支持动态扩展新的数据源

策略模式的应用

核心思想:通过 BaseChannel 抽象基类定义消息收发的接口,具体平台(QQ、微信等)实现各自的具体策略。

设计优势

  • • 运行时切换:可以通过配置动态启用/禁用不同的 Channel
  • • 代码隔离:各平台的实现相互独立,互不影响
  • • 开闭原则:对扩展开放(新增 Channel),对修改关闭(核心逻辑不变)

关键要点

  • • 定义清晰的抽象接口