乐于分享
好东西不私藏

02. nanobot 源码解读:Agent Loop

02. nanobot 源码解读:Agent Loop

02. nanobot 源码解读:Agent Loop

文档内容基于 HKUDS/nanobot: “🐈 nanobot: The Ultra-Lightweight Personal AI Agent” 的 main 分支 3c06db7 提交进行说明。


目录

  • • 02. nanobot 源码解读:Agent Loop
    • • 目录
    • • Agent Loop 核心概念
      • • LLM 接口与 finish_reason
      • • Agent Loop 工作原理
    • • nanobot 的 AgentLoop 类
      • • AgentLoop 类的职责与定位
      • • AgentLoop.run 方法
      • • AgentLoop._dispatch 方法
      • • AgentLoop._process_message 方法
      • • AgentLoop._run_agent_loop 方法
    • • nanobot 的 AgentRunner 类
      • • AgentRunner.run 方法
        • • messages 的裁剪
        • • hook 机制
        • • finish_reason 处理
        • • injection 机制

Agent Loop 核心概念

在深入解读 nanobot 源码之前,需要先理解 Agent Loop 的核心概念。而要理解 Agent Loop,首先需要了解 LLM 接口的基本定义。

LLM 接口与 finish_reason

这里以 DeepSeek 的 chat/completions 接口为例进行说明:

在接口响应中,需要重点关注 choices 字段中的 finish_reason 属性。该属性表示模型停止生成 token 的原因,通俗来说就是接口返回的终止条件。常见的 finish_reason 值包括:

  • • stop:模型正常完成了生成
  • • length:生成长度达到限制
  • • content_filter:内容被过滤
  • • tool_calls:模型请求调用工具

Agent Loop 的实现机制正是依赖于 finish_reason 字段(注:不同 LLM 提供商的接口格式可能存在差异)。

Agent Loop 工作原理

Agent Loop 的核心思想:针对一次用户输入,可能需要多次调用 LLM 才能生成合理的回复。这是因为模型可能需要调用工具、获取外部信息、进行多轮推理来回复用户输入,这些都需要通过循环调用来实现。

以下是用伪代码描述的 Agent Loop 基本流程:

# available_tools = [...]# history = [history_conversations, current_user_input]while condition:    request = build_llm_request(history, available_tools)    response = call_llm(history)    append_into_history(history, response)    if should_call_tools(response):        tool_results = call_tools(response, available_tools)        append_into_history(history, tool_results)    elif should_end(response):        return parse_response(response)

nanobot 的 AgentLoop 类

代码位置在 nanobot/agent/loop.py

重要说明:nanobot 这里的 AgentLoop 类封装的是整个应用运行期间所有消息的处理 Loop,而前面说的 Agent Loop 是针对单条消息的处理 Loop。两者层次不同,请注意区分。

AgentLoop 类的职责与定位

AgentLoop 的核心职责可以参考其 docstring

"""The agent loop is the core processing engine.It:1. Receives messages from the bus2. Builds context with history, memory, skills3. Calls the LLM4. Executes tool calls5. Sends responses back"""

简单来说,AgentLoop 不断从 MessageBus 中获取 InboundMessage 消息,触发 Agent Loop 流程处理消息,再将生成的 OutboundMessage 发送回 MessageBus

相关概念说明

  • • MessageBus:消息总线,负责在不同组件之间传递消息。InboundMessage 表示进入系统的消息(如用户输入),OutboundMessage 表示系统输出的消息(如 AI 回复)。
  • • Session:会话对象,存储了某个用户或对话的所有历史消息,用于维护对话上下文。

AgentLoop.run 方法

run 方法是启动 InboundMessage 处理流程的入口。

# 为便于解读,仅保留核心代码并添加注释,省略的细节请查看项目源码(后续引入代码都按照这个方式处理)    while self._running:        # 从 MessageBus 拿 InboundMessage        try:            msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)        except:            ...            continue        raw = msg.content.strip()        if self.commands.is_priority(raw):            # 高优先级的指令,如 /stop、/status,会阻塞 Loop 执行            ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw=raw, loop=self)            result = await self.commands.dispatch_priority(ctx)            if result:                # 处理结果发送回 MessageBus                await self.bus.publish_outbound(result)            continue        ...        # InboundMessage 的实质处理逻辑在 _dispatch 方法中        task = asyncio.create_task(self._dispatch(msg))

AgentLoop._dispatch 方法

_dispatch 控制并发,限制同一时间执行的 Agent Loop 数目,然后将 Agent Loop 的处理结果推送到 MessageBus

    # lock 和 gate 用来控制并发    # lock 控制同一个 session 同时只能有一个消息被处理,不同的 session 不做限制    # gate 用来控制全局并发,若设置了 gate,不同的 session 同时也只能只有一个消息被处理    lock = self._session_locks.setdefault(msg.session_key, asyncio.Lock())    gate = self._concurrency_gate or nullcontext()    async with lock, gate:        on_stream = on_stream_end = None        if msg.metadata.get("_wants_stream"):            # 略,定义了 on_stream 和 on_stream_end 函数            ...        # msg 的实际处理过程在 _process_message 方法中        response = await self._process_message(            msg, on_stream=on_stream, on_stream_end=on_stream_end,            pending_queue=pending,        )        if response is not None:            await self.bus.publish_outbound(response)        elif msg.channel == "cli":            # CLI 的交互模式接收用户输入后会一直阻塞等待消息返回,因此手动构造返回消息避免阻塞            await self.bus.publish_outbound(OutboundMessage(                channel=msg.channel, chat_id=msg.chat_id,                content="", metadata=msg.metadata or {},            ))

AgentLoop._process_message 方法

_process_message 方法负责 Agent Loop 调用前后的预处理和后处理工作,包括调用前的 token 裁剪、tool 上下文设置,调用后的重复消息检查等。

    # system 表示消息是 subagent 返回的结果,此时需要重新激活 main agent 去处理,细节略    if msg.channel == "system":        ...        return OutboundMessage(...)    # 消息处理时要找到对应的 session,session 中存储了所有的历史对话    key = session_key or msg.session_key    session = self.sessions.get_or_create(key)    # 可能有进程崩溃等问题导致 session 上下文丢失(例如 LLM 要求 tool 结果,但 tool 还没执行完成进程就挂了),此时可以通过 checkpoint 尝试恢复    # checkpoint 机制:nanobot 支持在关键节点保存 session 状态,进程崩溃后可从最近一次 checkpoint 恢复对话上下文    if self._restore_runtime_checkpoint(session):        self.sessions.save(session)    # Slash commands    raw = msg.content.strip()    ctx = CommandContext(msg=msg, session=session, key=key, raw=raw, loop=self)    # 有些指令不需要 LLM 参与,直接执行返回即可,例如 /new 新建 session 等    if result := await self.commands.dispatch(ctx):        return result    # LLM 的上下文窗口也是有限制的,session 历史消息太多可能超出限制,这里提供了裁剪合并逻辑    await self.consolidator.maybe_consolidate_by_tokens(session)    # 有些 tool 执行时可能用到 channel/chat_id/message_id 等字段,在LLM调用前先设置对应的上下文    self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))    if message_tool := self.tools.get("message"):        # MessageTool 允许在 Agent Loop 期间直接回复 channel 消息        # 前面 _dispatch 逻辑中 response 为 None 的场景就是因为 MessageTool 发送回复了,为了避免重复发送就返回 None        if isinstance(message_tool, MessageTool):            # Agent Loop 前重置标记,Agent Loop 后检查标记是否发送了消息(MessageTool 发送消息后会修改标记),以避免重复发送            message_tool.start_turn()    history = session.get_history(max_messages=0)    # 重头戏,这里是构造 prompt 的核心逻辑,memory、skill 等都是在这个阶段添加进去的    # 后面展开研究    initial_messages = self.context.build_messages(        history=history,        current_message=msg.content,        session_summary=pending,        media=msg.media if msg.media else None,        channel=msg.channel,        chat_id=msg.chat_id,    )    ...    # 调用 Agent Loop    final_content, _, all_msgs, stop_reason, had_injections = await self._run_agent_loop(        initial_messages,        on_progress=on_progress or _bus_progress,        on_stream=on_stream,        on_stream_end=on_stream_end,        session=session,        channel=msg.channel,        chat_id=msg.chat_id,        message_id=msg.metadata.get("message_id"),        # 和后续的 injection 机制有关        pending_queue=pending_queue,    )    ...    # 通过 MessageTool 发送过回复的就返回 None,避免外层将 OutboundMessage 发送到 MessageBus 导致重复发送    if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn:        return None    ...    return OutboundMessage(        channel=msg.channel, chat_id=msg.chat_id, content=final_content,        metadata=meta,    )

AgentLoop._run_agent_loop 方法

_run_agent_loop 方法内容是准备 AgentRunner.run 方法的参数并调用。

nanobot 的 AgentRunner 类

AgentRunner 代码位置在 nanobot/agent/runner.py,实现了 Agent Loop 流程。

AgentLoop 类:负责整个应用运行期间的消息循环,从 MessageBus 消费 InboundMessage,协调并发控制、session 管理、checkpoint 恢复等工作。AgentRunner 类:负责单条消息的 Agent Loop 执行,包括 LLM 调用、工具执行、hook 触发、injection 处理等核心逻辑。

简单来讲,AgentLoop 是消息分发器AgentRunner 是消息处理器

AgentRunner.run 方法

在伪代码示例的 Agent Loop 基础上,nanobot 增加了如下修改:

  • • messages 的裁剪:传递给 LLM 的 messages 会基于原始 messages 进行裁剪,以减少 token 消耗
  • • hook 机制:在特定位置暴露 hook 函数,如 iteration 开始结束、tool 调用前等时机
  • • finish_reason 处理:增加了 errorlength 等原因的处理逻辑
  • • injection 机制:在第一条消息的 Agent Loop 期间,允许后续的消息直接注入当前 Agent Loop,而不是等待第一条消息处理完后再使用新的 Agent Loop 来处理
  • • 其他信息记录:如 token 消耗记录、tool 调用记录、checkpoint 记录等

messages 的裁剪

主要是从如下两个维度展开:

  • • tool 结果裁剪
  • • 历史消息窗口裁剪

hook 机制

nanobot 定义了 AgentHook 类(nanobot/agent/hook.py):

然后在 Agent Loop 的特定位置触发对应的 hook 回调。

finish_reason 处理

finish_reason
处理方式
tool调用
执行 tool 并将 tool 结果添加到历史消息,然后继续 Agent Loop
length
添加一行让 LLM 继续输出剩下内容的消息,然后继续 Agent Loop
error
添加 error 记录,然后结束 Agent Loop
空回复
添加空回复记录,然后结束 Agent Loop
正常结束
正常结束 Agent Loop

injection 机制

在第一条消息的 Agent Loop 期间,允许后续的消息直接注入到当前 Agent Loop一起处理;而不是等待此轮Agent Loop结束后,使用新的Agent Loop来处理后面的新消息。

Injection 机制要求外层负责消息分发的逻辑与 Agent Loop 消息处理逻辑共同配合实现:

  • • 在 Agent Loop 消息处理逻辑中增加一个能获取到新消息的操作,在Agent Loop迭代期间(如 tool 调用结束后)将新消息加入到 messages 中
  • • 消息分发逻辑中优先将新消息提供给上述操作

injection 机制下,消息要么作为 Agent Loop 的第一条消息,要么作为 Agent Loop 逻辑中的 injection 消息。逻辑实现时必须保证两点:

  • • 不漏处理新消息:新消息作为 injection 消息时,可能因为 Agent Loop 结束的太快导致 injection 消息没有被处理。所以为了避免漏处理消息,AgentLoop._dispatch 中会将未处理的 injection 消息重新发送到 MessageBus 等待处理。
  • • 不重复处理新消息:当存在 Agent Loop 时,消息只作为 injection 消息。当且仅当作为 injection 消息没有被处理时,才会重新进入 MessageBus等待处理。

这部分的逻辑还存在一些不完善之处,举一个明显的例子:

当 finish_reason 为 error 时,injection 成功和 injection 失败时的 messages 会不一致:

  • • injection 成功时
    • • Agent Loop 1: [history] + {current_llm_response} + {injection_message}, 继续 Agent Loop
  • • injection 失败
    • • Agent Loop 1:[history] + {error_message}, 退出 Agent Loop
    • • 新消息(即上文的 injection_message)触发新的 Agent Loop:[history] + {error_message} + {injection_message}