乐于分享
好东西不私藏

OpenClaw源码解读之通道与路由系统

OpenClaw源码解读之通道与路由系统

大年初三,今天继续OpenClaw源码解读——通道与路由系统

使用的main分支,commit版本是:f5160ca6becaeeb6a4dfd892fffd2130a696f766

讲解计划如下:

1. CLI 框架与进程模型

2. 配置系统

3. Gateway 核心

4. 通道与路由(今日讲解)

5. Agent 引擎

6. 自动回复管线

7. 插件系统

8. 记忆系统

9. Web 控制台

10. 原生客户端

11. 浏览器自动化

12. 运维与测试

概述

OpenClaw 的通道系统是整个平台连接外部世界的入口——Telegram、Discord、WhatsApp、Slack、Signal、iMessage 以及 30 多个扩展通道。当一条消息从任何通道进来,它需要经过路由(确定由哪个 Agent 处理)、回复管线(调用 AI 生成回复)、出站投递(把回复发回来源通道)三个阶段。本文沿着这条完整路径,从通道插件的抽象接口到消息的最终投递,逐层拆解。

一、通道插件的抽象接口

每个通道——无论是核心的 Telegram 还是扩展的 MS Teams——都实现同一个 `ChannelPlugin` 接口。这是整个通道系统的根基,定义在 `src/channels/plugins/types.plugin.ts`:

export type ChannelPlugin<ResolvedAccount = anyProbe = unknownAudit = unknown> = {  idChannelId;  metaChannelMeta;  capabilitiesChannelCapabilities;  defaults?: { queue?: { debounceMs?: number } };  reload?: { configPrefixesstring[]; noopPrefixes?: string[] };  onboarding?: ChannelOnboardingAdapter;  configChannelConfigAdapter<ResolvedAccount>;  configSchema?: ChannelConfigSchema;  setup?: ChannelSetupAdapter;  pairing?: ChannelPairingAdapter;  security?: ChannelSecurityAdapter<ResolvedAccount>;  groups?: ChannelGroupAdapter;  mentions?: ChannelMentionAdapter;  outbound?: ChannelOutboundAdapter;  status?: ChannelStatusAdapter<ResolvedAccountProbeAudit>;  gateway?: ChannelGatewayAdapter<ResolvedAccount>;  actions?: ChannelMessageActionAdapter;  heartbeat?: ChannelHeartbeatAdapter;  agentTools?: ChannelAgentToolFactory | ChannelAgentTool[];  // ... 还有 streaming, threading, messaging, agentPrompt, directory, resolver 等};

这个类型的设计哲学是完全可选的适配器组合。除了 `id`、`meta`、`capabilities`、`config` 四个必需字段外,其余 20 多个适配器(adapter)都是可选的。每个适配器负责一个独立的能力维度。

`ChannelMeta` 描述通道的展示信息(名称、文档路径、排序、图标等)。`ChannelCapabilities` 声明通道支持的功能矩阵:

export type ChannelCapabilities = {  chatTypesArray<ChatType | "thread">;  polls?: boolean;  reactions?: boolean;  edit?: boolean;  unsend?: boolean;  reply?: boolean;  effects?: boolean;  groupManagement?: boolean;  threads?: boolean;  media?: boolean;  nativeCommands?: boolean;  blockStreaming?: boolean;};

`chatTypes` 是最关键的:它告诉系统这个通道支持哪些对话类型——`direct`(私聊)、`group`(群聊)、`channel`(频道)、`thread`(帖子/话题)。后续的路由和会话管理都基于这个信息做决策。

`ChannelConfigAdapter` 是连接配置系统与通道的桥梁,每个通道必须实现:

export type ChannelConfigAdapter<ResolvedAccount> = {  listAccountIds(cfgOpenClawConfig) => string[];  resolveAccount(cfgOpenClawConfigaccountId?: string | null) => ResolvedAccount;  defaultAccountId?: (cfgOpenClawConfig) => string;  isEnabled?: (accountResolvedAccountcfgOpenClawConfig) => boolean;  isConfigured?: (accountResolvedAccountcfgOpenClawConfig) => boolean | Promise<boolean>;  // ...};

`listAccountIds` 列出配置中该通道的所有账号(一个通道可以有多个 Bot 账号)。`resolveAccount` 从配置中提取出该账号的完整配置对象——这个返回类型是泛型的,每个通道自己定义。比如 Telegram 的 `ResolvedAccount` 包含 `botToken`、`webhookUrl`、`dmPolicy` 等字段,WhatsApp 的则包含 `authDir`、`service`、`region` 等。

二、Dock:轻量级通道元数据

在 `ChannelPlugin` 之外,还有一层更轻的抽象——`ChannelDock`(`src/channels/dock.ts`)。它的设计目标是:让不需要加载完整通道插件的共享代码路径也能获取通道的基本信息。

// Rules:// - keep this module *light* (no monitors, probes, puppeteer/web login, etc)// - OK: config readers, allowFrom formatting, mention stripping patterns, threading defaults// - shared code should import from here, not from the plugins registryconst DOCKSRecord<ChatChannelIdChannelDock> = {  telegram: {    id"telegram",    capabilities: {      chatTypes: ["direct""group""channel""thread"],      nativeCommandstrue,      blockStreamingtrue,    },    outbound: { textChunkLimit4000 },    // ... groups, mentions, threading 等轻量适配器  },  discord: { ... },  whatsapp: { ... },  // ...};

`ChannelDock` 只包含静态元数据和轻量级的配置读取函数,不会触发网络连接、浏览器启动或 monitor 初始化。路由、system prompt 构建、allowFrom 格式化等共享逻辑从 dock 获取通道信息,避免了对完整插件的依赖。

这种两层抽象的设计是有意为之:dock 用于”知道通道是什么”,plugin 用于”让通道真正工作”。

三、通道注册与发现

通道的注册通过插件系统完成。每个通道(无论核心还是扩展)本质上都是一个插件,在 `extensions/<channel>/index.ts` 中通过 `api.registerChannel({ plugin })` 注册自己。

核心通道注册表在 `src/channels/registry.ts` 中维护了一个有序列表:

export const CHAT_CHANNEL_ORDERChatChannelId[] = [  "telegram""whatsapp""discord""irc""googlechat",  "slack""signal""imessage",];

这个顺序决定了 onboarding 向导和状态输出中通道的显示顺序。

运行时通道发现走 `src/channels/plugins/index.ts`:

export function listChannelPlugins(): ChannelPlugin[] { ... }export function getChannelPlugin(id: ChannelId): ChannelPlugin | undefined { ... }

这些函数从活跃的插件注册表中获取所有已注册的通道插件。扩展通道(如 `extensions/msteams`)通过插件加载器在 Gateway 启动时被发现和加载,之后就和核心通道完全平等——使用相同的接口、走相同的路由和投递管线。

核心通道与扩展通道的唯一区别:核心通道在 `registry.ts` 和 `dock.ts` 中有硬编码的元数据入口,扩展通道的元数据完全来自插件注册。这意味着共享代码路径在找不到 dock 时会回退到插件注册表。

四、消息入站:从通道到路由

当一条消息到达通道时(比如用户在 Telegram 群里 @了 Bot),通道的 handler 负责:

1. 解析原始消息(文本、媒体、发送者信息、群组信息等)

2. 调用路由系统确定目标 Agent

3. 构建消息上下文(`MsgContext`)

4. 传递给自动回复管线

以 Telegram 为例,`src/telegram/bot-message-context.ts` 中的 `buildTelegramMessageContext` 是这个流程的核心。它先解析 Telegram 的消息结构(群/私聊/论坛话题/频道),然后调用路由:

const route = resolveAgentRoute({  cfgloadConfig(),  channel"telegram",  accountId: account.accountId,  peer: {    kind: isGroup ? "group" : "direct",    id: peerId,  },  parentPeer,});const baseSessionKey = route.sessionKey;

`resolveAgentRoute` 返回的 `route` 包含三个关键信息:`agentId`(由哪个 Agent 处理)、`sessionKey`(使用哪个会话)、`matchedBy`(匹配规则描述,用于调试)。

五、路由决策:`resolveAgentRoute`

路由逻辑在 `src/routing/resolve-route.ts` 中,只有一个函数,但包含了一套级联优先级匹配规则

export function resolveAgentRoute(input: ResolveAgentRouteInput): ResolvedAgentRoute {  const bindings = listBindings(input.cfg).filter((binding) => {    if (!matchesChannel(binding.match, channel)) return false;    return matchesAccountId(binding.match?.accountId, accountId);  });  // 优先级 1:精确 peer 匹配  if (peer) {    const peerMatch = bindings.find((b) => matchesPeer(b.match, peer));    if (peerMatch) return choose(peerMatch.agentId"binding.peer");  }  // 优先级 2:线程父 peer 继承  if (parentPeer && parentPeer.id) {    const parentPeerMatch = bindings.find((b) => matchesPeer(b.match, parentPeer));    if (parentPeerMatch) return choose(parentPeerMatch.agentId"binding.peer.parent");  }  // 优先级 3:Guild + 角色匹配(Discord 特有)  if (guildId && memberRoleIds.length > 0) { ... }  // 优先级 4:Guild 匹配  if (guildId) { ... }  // 优先级 5:Team 匹配(Slack 特有)  if (teamId) { ... }  // 优先级 6:Account 匹配(绑定到特定 Bot 账号)  const accountMatch = bindings.find((b) =>    b.match?.accountId?.trim() !== "*" && !b.match?.peer && !b.match?.guildId && !b.match?.teamId  );  if (accountMatch) return choose(accountMatch.agentId"binding.account");  // 优先级 7:Channel 匹配(通配符,任何账号)  const anyAccountMatch = bindings.find((b) =>    b.match?.accountId?.trim() === "*" && !b.match?.peer && !b.match?.guildId && !b.match?.teamId  );  if (anyAccountMatch) return choose(anyAccountMatch.agentId"binding.channel");  // 兜底:使用默认 Agent  return choose(resolveDefaultAgentId(input.cfg), "default");}

第一步是预过滤:从配置的 `bindings` 列表中筛选出匹配当前通道和账号的绑定规则。然后按优先级逐层尝试匹配。

这个优先级设计覆盖了从最具体到最宽泛的所有场景。比如你可以为 Telegram 群 12345 指定 Agent A,为同一个 Telegram Bot 的其他群指定 Agent B,为 Discord 某个角色组指定 Agent C——不同规则之间通过优先级自然降级。

`choose` 函数在确定 `agentId` 后,调用 `buildAgentSessionKey` 生成会话键。

六、会话键构建

会话键(Session Key)决定了消息属于哪个对话上下文。它的结构是:`agent:<agentId>:<channel>:<peerKind>:<peerId>`。

`buildAgentPeerSessionKey`(`src/routing/session-key.ts`)的核心逻辑处理了四种 DM 作用域:

export function buildAgentPeerSessionKey(params: {  agentId: string;  channel: string;  accountId?: string | null;  peerKind?: ChatType | null;  peerId?: string | null;  dmScope?: "main" | "per-peer" | "per-channel-peer" | "per-account-channel-peer";  identityLinks?: Record<stringstring[]>;}): string {  const peerKind = params.peerKind ?? "direct";  if (peerKind === "direct") {    const dmScope = params.dmScope ?? "main";    // ...    if (dmScope === "per-account-channel-peer" && peerId) {      return `agent:${agentId}:${channel}:${accountId}:direct:${peerId}`;    }    if (dmScope === "per-channel-peer" && peerId) {      return `agent:${agentId}:${channel}:direct:${peerId}`;    }    if (dmScope === "per-peer" && peerId) {      return `agent:${agentId}:direct:${peerId}`;    }    return buildAgentMainSessionKey({ agentId, mainKey });  }  // 群聊/频道:总是按 channel + peerKind + peerId 隔离  return `agent:${agentId}:${channel}:${peerKind}:${peerId}`;}

DM 作用域是一个重要的配置选项(`session.dmScope`),它控制私聊消息的会话隔离粒度:

  • `main`(默认):所有私聊消息共享一个会话。你在 Telegram 私聊和 Discord 私聊跟同一个 Agent 说话,共享上下文

  • `per-peer`:按发送者隔离。不同人的私聊互不影响

  • `per-channel-peer`:按通道 + 发送者隔离。同一个人在 Telegram 和 Discord 的对话分开

  • `per-account-channel-peer`:最细粒度,连同一通道的不同 Bot 账号都分开

群聊没有 `dmScope` 的概念——它们总是按 `channel:peerKind:peerId` 隔离,每个群/频道一个独立会话。

还有一个精妙的设计——身份链接(`identityLinks`)。如果用户在配置中声明了 `session.identityLinks: { “alice”: [“telegram:12345”, “discord:67890”] }`,那么 Alice 在 Telegram 和 Discord 的消息会被路由到同一个会话(即使 dmScope 是 `per-peer`)。`resolveLinkedPeerId` 函数在构建会话键时查找链接关系,把不同通道的 peerId 映射到同一个 canonical name。

线程(Thread)会在基础会话键后追加 `:thread:<threadId>` 后缀,形成独立的子会话。这在 Slack 话题、Telegram 论坛、Discord 帖子等场景下使用。

七、自动回复管线

路由完成后,消息进入自动回复管线。入口是 `src/auto-reply/reply/get-reply.ts` 中的 `getReplyFromConfig`——这是一个 342 行的编排函数,按顺序执行约 10 个阶段:

export async function getReplyFromConfig(  ctx: MsgContext,  opts?: GetReplyOptions,): Promise<ReplyPayload | ReplyPayload[] | undefined> {  // 1. 从会话键解析 Agent ID  const agentId = resolveSessionAgentId({ sessionKey: agentSessionKey, config: cfg });  // 2. 合并技能过滤器  const mergedSkillFilter = mergeSkillFilters(opts?.skillFilterresolveAgentSkillsFilter(cfg, agentId));  // 3. 解析模型(默认模型 or 心跳覆盖模型)  const { defaultProvider, defaultModel, aliasIndex } = resolveDefaultModel({ cfg, agentId });  // 4. 确保 Agent 工作目录存在  const workspace = await ensureAgentWorkspace({ dir: workspaceDirRaw });  // 5. 媒体/链接理解(图片转文字、URL 抓取等)  await applyMediaUnderstanding({ ctx: finalized, cfg, agentDir, activeModel });  await applyLinkUnderstanding({ ctx: finalized, cfg });  // 6. 命令授权检查  resolveCommandAuthorization({ ctx: finalized, cfg, commandAuthorized });  // 7. 初始化会话状态(加载历史、检查是否新会话、处理 reset)  const sessionState = await initSessionState({ ctx: finalized, cfg, commandAuthorized });  // 8. 解析指令(/think, /verbose, /model 等用户指令)  const directiveResult = await resolveReplyDirectives({ ... });  // 9. 处理内联动作(/status, /compact, /session 等内置命令)  const inlineActionResult = await handleInlineActions({ ... });  // 10. 运行 Agent 生成回复  return runPreparedReply({ ... });}

每个阶段的结果可能是一个”提前返回”:指令可能产生直接回复(比如 `/status` 显示状态),内联动作可能不需要调用 AI。只有到第 10 步才真正调用 `runPreparedReply`,后者把消息发给 AI Agent 引擎(`runEmbeddedPiAgent`),等待流式回复。

八、回复投递:ReplyDispatcher

AI 生成的回复需要发回来源通道。这里有一个序列化问题:AI 的回复可能是流式的——先产生工具调用结果,再产生文本分块,最后是最终回复。`ReplyDispatcher`(`src/auto-reply/reply/reply-dispatcher.ts`)解决了这个问题:

export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDispatcher {  let sendChainPromise<void> = Promise.resolve();  let pending = 0;  let sentFirstBlock = false;  const enqueue = (kind: ReplyDispatchKind, payload: ReplyPayload) => {    const normalized = normalizeReplyPayloadInternal(payload, { ... });    if (!normalized) return false;    pending += 1;    const shouldDelay = kind === "block" && sentFirstBlock;    if (kind === "block") sentFirstBlock = true;    sendChain = sendChain      .then(async () => {        if (shouldDelay) {          const delayMs = getHumanDelay(options.humanDelay);          if (delayMs > 0await sleep(delayMs);        }        await options.deliver(normalized, { kind });      })      .catch((err) => options.onError?.(err, { kind }))      .finally(() => {        pending -= 1;        if (pending === 0) options.onIdle?.();      });    return true;  };  return {    sendToolResult(payload) => enqueue("tool", payload),    sendBlockReply(payload) => enqueue("block", payload),    sendFinalReply(payload) => enqueue("final", payload),    waitForIdle() => sendChain,  };}

核心是一个 Promise 链(`sendChain`)。每次 `enqueue` 都把新的投递任务追加到链尾,确保 tool → block → final 的严格顺序。`pending` 计数器跟踪在飞的投递数量,归零时触发 `onIdle` 回调。

一个有趣的细节是仿人延迟(human delay)。如果配置启用了,分块回复之间会插入随机延迟(默认 800-2500ms),让 AI 的回复节奏更自然,不会瞬间刷屏。第一个分块不加延迟——用户等了那么久,第一条消息应该立刻送出去。

`ReplyDispatcher` 还有一个带 Typing 指示器的增强版 `createReplyDispatcherWithTyping`,它在等待 AI 回复期间显示”正在输入…”状态,idle 时取消。

九、出站适配器:最后一公里

回复投递的”最后一公里”由每个通道的 `ChannelOutboundAdapter` 完成:

export type ChannelOutboundAdapter = {  deliveryMode"direct" | "gateway" | "hybrid";  chunker?: ((textstringlimitnumber) => string[]) | null;  textChunkLimit?: number;  sendPayload?: (ctxChannelOutboundPayloadContext) => Promise<OutboundDeliveryResult>;  sendText?: (ctxChannelOutboundContext) => Promise<OutboundDeliveryResult>;  sendMedia?: (ctxChannelOutboundContext) => Promise<OutboundDeliveryResult>;  sendPoll?: (ctxChannelPollContext) => Promise<ChannelPollResult>;};

`deliveryMode` 决定投递方式:`direct` 表示通道自己直接发送(如 Telegram Bot API 调用),`gateway` 表示通过 Gateway WebSocket 转发给远程节点(如 iMessage 需要 Mac 节点中转),`hybrid` 表示两种都支持。

`chunker` 和 `textChunkLimit` 处理消息分块。每个平台有不同的消息长度限制(Telegram 4000 字符、WhatsApp 2000 字符等),超长回复需要拆分发送。

以 Telegram 为例,`sendPayload` 最终调用 Grammy 库的 Bot API 方法,处理文本消息、媒体消息、inline keyboard、回复引用、线程定位等所有 Telegram 特有的发送逻辑。

十、完整链路回顾

一条消息从外部通道到 AI 回复投递的完整链路:

  1. 入站:外部平台 → 通道 handler(如 Grammy Telegram bot handler)→ 解析消息结构 → 构建 `MsgContext`

  2. 路由:`resolveAgentRoute` 根据配置中的 `bindings` 列表做级联匹配 → 确定 `agentId` + `sessionKey`

  3. 回复生成:`getReplyFromConfig` 编排管线 → 媒体/链接理解 → 指令解析 → 会话初始化 → `runPreparedReply` 调用 AI Agent

  4. 出站:AI 流式输出 → `ReplyDispatcher` 序列化 tool/block/final → 通道的 `ChannelOutboundAdapter.sendPayload` → 平台 API 投递

这条管线中,路由系统和通道适配器是完全解耦的。路由只看配置中的绑定规则和通道/peer 标识符,不关心底层平台细节。通道适配器只负责消息格式转换和 API 调用,不关心路由决策。中间的自动回复管线通过 `MsgContext` 这个统一的消息上下文对象把两端连接起来。

这种架构使得新增一个通道非常清晰:实现 `ChannelPlugin` 接口的必要适配器,通过 `api.registerChannel` 注册,剩下的路由、回复、投递管线自动适配。扩展通道(`extensions/` 下的独立 workspace 包)和核心通道走完全相同的代码路径,没有任何二等公民。

十一、扩展通道的差异

扩展通道和核心通道在运行时是完全平等的,但加载方式略有不同:

  • 核心通道在 `src/channels/registry.ts` 中硬编码元数据和顺序,在 `src/channels/dock.ts` 中提供轻量级 dock

  • 扩展通道(如 `extensions/msteams`、`extensions/matrix`)是独立的 workspace 包,通过插件加载器从 `extensions/` 目录或 `plugins.load.paths` 配置项动态发现

  • 扩展通道的依赖安装用 `npm install –omit=dev`,运行时依赖必须在 `dependencies` 中声明

  • 扩展通道可以在注册时提供 `dock` 对象,供共享代码路径使用;如果没提供,系统会从插件注册信息自动构建

扩展通道甚至可以通过 `meta.preferOver` 字段声明自己应该替代某个核心通道,实现通道的”热替换”。

这套设计让 OpenClaw 的通道系统既有核心通道的稳定性(编译时类型检查、硬编码元数据),又有扩展通道的灵活性(运行时发现、独立依赖、即插即用)。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » OpenClaw源码解读之通道与路由系统

评论 抢沙发

6 + 1 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮