文档版本:2026.4.10 最后更新:2026-04-10
源码验证: ✅ 本文档基于实际源码分析,使用符号工具验证

1. 模块概述
频道管理模块是 OpenClaw 的多平台消息入口层,负责接收、路由和发送消息到多个即时通讯平台。该模块采用插件化架构,支持内置频道(Telegram、Discord、Slack、Signal等)和扩展频道(Teams、Matrix、Zalo等)。
核心职责
- • 消息接收: 监听各平台消息事件(Webhook、长轮询、WebSocket)
- • 路由决策: 根据频道、账户、对话、群组匹配 Agent 路由
- • 访问控制: Allowlist 验证、命令门控、提及检测
- • 消息发送: 多频道消息格式化、分块、重试
- • 会话管理: 记录入站会话、对话标签、发送者身份
支持的频道(基于 registry.ts 验证)
// 内置频道 (CHAT_CHANNEL_ORDER from src/channels/ids.ts)
const CHANNEL_IDS = [
"telegram", // Telegram Bot API
"whatsapp", // WhatsApp Web (Baileys)
"discord", // Discord.js Gateway
"irc", // IRC
"googlechat", // Google Chat
"slack", // Slack Bolt SDK
"signal", // Signal REST API
"imessage", // iMessage (Peekaboo 桥接)
"line", // LINE Messaging API
] as const;
// 注意:web、acp 不在 CHANNEL_IDS 中,它们是 Gateway 内部通道
// 扩展频道(如 Matrix、Zalo、Teams 等)通过插件注册
// 频道别名(从各频道的 openclaw.plugin.json 清单动态加载)
// 运行时通过 buildChatChannelMetaById() → CHAT_CHANNEL_ALIASES 构建
// 示例别名:tg → telegram, dsc → discord, wa → whatsapp 等关键文件位置
src/
├── channels/ # 频道通用逻辑
│ ├── registry.ts # 频道注册表
│ ├── ids.ts # 频道 ID 列表与别名
│ ├── channel-config.ts # 频道配置匹配(泛型)
│ ├── command-gating.ts # 命令门控
│ ├── allowlist-match.ts # Allowlist 原始匹配
│ ├── allow-from.ts # DM/群组 Allowlist 决策
│ ├── allowlists/
│ │ └── resolve-utils.ts # Allowlist 用户解析工具
│ ├── session.ts # 入站会话记录
│ ├── session-meta.ts # 会话元数据异步记录
│ ├── session-envelope.ts # 入站会话 Envelope 上下文
│ ├── sender-identity.ts # 发送者身份解析
│ ├── sender-label.ts # 发送者标签格式化
│ ├── conversation-label.ts # 对话标签生成
│ ├── conversation-binding-context.ts # 对话绑定上下文
│ ├── mention-gating.ts # 提及门控
│ ├── ack-reactions.ts # 确认反应(旧)
│ ├── status-reactions.ts # 状态反应控制器(含 stall)
│ ├── typing.ts # 输入状态
│ ├── typing-lifecycle.ts # 输入状态生命周期
│ ├── typing-start-guard.ts # 输入速率保护
│ ├── targets.ts # 目标解析
│ ├── inbound-debounce-policy.ts # 入站消息防抖策略
│ ├── run-state-machine.ts # 频道运行状态机(busy/heartbeat)
│ ├── draft-stream-controls.ts # 草稿流控制(可终结)
│ ├── draft-stream-loop.ts # 草稿流节流循环
│ ├── thread-bindings-policy.ts # 线程绑定策略
│ ├── thread-binding-id.ts # 线程绑定 ID 工具
│ ├── thread-bindings-messages.ts # 线程绑定消息
│ ├── model-overrides.ts # 频道级模型覆盖
│ ├── location.ts # 位置消息处理
│ ├── native-command-session-targets.ts # 原生命令会话目标
│ ├── model-overrides.ts # 按频道模型覆盖解析
│ ├── logging.ts # 频道日志工具
│ ├── reply-prefix.ts # 回复前缀
│ └── transport/
│ └── stall-watchdog.ts # 传输层停滞看门狗
├── routing/ # 路由逻辑
│ ├── resolve-route.ts # Agent 路由解析(带缓存)
│ ├── session-key.ts # Session Key 构建
│ ├── bindings.ts # 绑定管理
│ ├── account-id.ts # 账户 ID 规范化
│ ├── account-lookup.ts # 账户查找
│ └── default-account-warnings.ts # 默认账户警告
├── telegram/ # Telegram 实现
│ ├── bot.ts # Bot 创建 (362 行)
│ ├── bot-handlers.ts # 消息处理
│ ├── bot-message-dispatch.ts # 消息分发
│ ├── send.ts # 发送逻辑
│ ├── download.ts # 媒体下载
│ └── ...
├── discord/ # Discord 实现
│ ├── monitor.ts # Gateway 监听
│ ├── monitor.gateway.ts # Gateway 事件处理
│ ├── send.ts # 发送逻辑
│ └── ...
├── slack/ # Slack 实现
├── signal/ # Signal 实现
├── whatsapp/ # WhatsApp 实现
├── imessage/ # iMessage 实现
├── web/ # Web UI 实现
└── auto-reply/ # 自动回复协调层
├── reply.ts # 回复入口
├── dispatch.ts # 分发逻辑
├── envelope.ts # 消息封装
└── ...2. 核心组件(基于源码验证)

2.1 频道注册表(registry.ts)
频道元数据
// ChatChannelMeta = ChannelMeta (来自 src/channels/plugins/types.ts)
// 元数据从各频道的 openclaw.plugin.json 清单动态加载
type ChatChannelMeta = {
id: ChatChannelId;
label: string; // 显示名称
selectionLabel: string; // 选择列表标签
docsPath: string; // 文档路径
blurb: string; // 简介
aliases?: string[]; // 别名
order?: number; // 排序
markdownCapable?: boolean; // 是否支持 Markdown
// ... 其他可选字段
};
// 注意:元数据不包含 emoji 字段
// 注意:docs URL 使用 docsPath(如 "/channels/telegram"),非完整 URL关键函数
// 规范化频道 ID(委托给内部实现)
function normalizeChannelId(raw?: string | null): ChatChannelId | null {
return normalizeChatChannelId(raw);
// 内部:trim + lowercase → 直接匹配 CHANNEL_IDS → 别名匹配 CHAT_CHANNEL_ALIASES
}
// 列出所有频道(按 CHAT_CHANNEL_ORDER 排序)
function listChatChannels(): ChatChannelMeta[] {
return CHAT_CHANNEL_ORDER.map(id => CHAT_CHANNEL_META[id]);
}
// 列出注册的频道插件(包括运行时注册的扩展频道)
function listRegisteredChannelPluginEntries(): RegisteredChannelPluginEntry[] {
return getActivePluginRegistry()?.channels ?? [];
}2.2 路由解析(resolve-route.ts)

核心函数:resolveAgentRoute
文件: src/routing/resolve-route.ts (L26-L835)
type ResolveAgentRouteInput = {
cfg: OpenClawConfig;
channel: string; // 频道 ID
accountId?: string | null; // 账户 ID(Bot Token/账号)
peer?: RoutePeer | null; // 对等方(DM/线程对象)
parentPeer?: RoutePeer | null; // 线程的父消息对等方(用于线程绑定继承)
guildId?: string | null; // 群组 ID(Discord、Slack)
teamId?: string | null; // 团队 ID(Slack)
memberRoleIds?: string[]; // Discord 成员角色 ID(用于角色路由)
};
type ResolvedAgentRoute = {
agentId: string; // Agent ID
channel: string; // 频道
accountId: string; // 账户 ID
sessionKey: string; // Session Key(持久化 + 并发控制)
mainSessionKey: string; // 主会话 Key(DM 合并用)
lastRoutePolicy: "main" | "session"; // 入站 last-route 更新的目标会话
matchedBy:
| "binding.peer" // 匹配绑定:对等方(精确)
| "binding.peer.parent" // 匹配绑定:线程父消息(继承)
| "binding.peer.wildcard" // 匹配绑定:同类型对等方通配
| "binding.guild+roles" // 匹配绑定:Discord 服务器 + 角色
| "binding.guild" // 匹配绑定:群组/服务器
| "binding.team" // 匹配绑定:团队(Slack)
| "binding.account" // 匹配绑定:账户
| "binding.channel" // 匹配绑定:频道通配
| "default"; // 默认
};路由匹配优先级(分层 tier 架构)
路由解析采用 分层 方式,按优先级从高到低顺序首次匹配:
| Tier | matchedBy | 启用条件 | 说明 |
|---|---|---|---|
| 1 | binding.peer | peer 非空 | 精确对等方绑定 |
| 2 | binding.peer.parent | parentPeer.id 非空 | 线程父消息继承绑定 |
| 3 | binding.peer.wildcard | peer 非空 | 同类型对等方通配(kind:*) |
| 4 | binding.guild+roles | guildId + memberRoleIds 均非空 | Discord 角色路由 |
| 5 | binding.guild | guildId 非空 | 服务器/群组绑定 |
| 6 | binding.team | teamId 非空 | Slack 工作区绑定 |
| 7 | binding.account | 始终 | 精确账户 ID 绑定 |
| 8 | binding.channel | 始终 | accountId: "*" 频道通配 |
| - | default | 兜底 | cfg.agent.id 默认 Agent |

内部使用 EvaluatedBinding 预处理绑定并建立索引(by-peer、byGuild、byGuildWithRoles 等 Map),并对常用路由结果缓存(resolvedRouteCacheByCfg)。
Session Key 构建
function buildAgentSessionKey(params: {
agentId: string;
channel: string;
peer?: RoutePeer | null;
dmScope?: "main" | "per-peer";
identityLinks?: IdentityLink[];
}): string {
const { agentId, channel, peer, dmScope = "main", identityLinks } = params;
// DM 作用域决策
if (peer && dmScope === "per-peer") {
// 身份关联:跨频道合并会话
const linkedId = findLinkedIdentity(peer, identityLinks);
if (linkedId) {
return `${agentId}:dm:linked:${linkedId}`;
}
// Per-peer:独立会话
return `${agentId}:${channel}:dm:${peer.kind}:${peer.id}`;
}
// 主会话:所有 DM 共享
return `${agentId}:${channel}:dm:main`;
}2.3 频道配置匹配(channel-config.ts)

配置匹配逻辑(已重构为泛型 API)
// ChannelMatchSource 已简化为三种来源
type ChannelMatchSource = "direct" | "parent" | "wildcard";
// ChannelEntryMatch 现在是泛型,适用于任意配置类型
type ChannelEntryMatch<T> = {
entry?: T; // 直接匹配的条目
key?: string; // 匹配的 Key
wildcardEntry?: T; // 通配符候选("*" Key)
wildcardKey?: string;
parentEntry?: T; // 父条目(线程继承)
parentKey?: string;
matchKey?: string; // 最终使用的 Key
matchSource?: ChannelMatchSource; // "direct" | "parent" | "wildcard"
};
// 核心匹配函数 — 泛型,接受 entries Record 和有序 keys 列表
function resolveChannelEntryMatch<T>(params: {
entries?: Record<string, T>;
keys: string[]; // 按优先级排列的候选 Key(调用方预先构建)
wildcardKey?: string; // 全局通配(通常为 "*")
}): ChannelEntryMatch<T>;
// 带 parentKeys 的完整匹配(支持线程父条目回退)
function resolveChannelEntryMatchWithFallback<T>(params: {
entries?: Record<string, T>;
keys: string[]; // 直接候选 Key
parentKeys?: string[]; // 父线程候选 Key(继承)
wildcardKey?: string;
normalizeKey?: (value: string) => string;
}): ChannelEntryMatch<T>;
// 辅助:对变长 key 列表去重(调用方负责构建优先级顺序)
function buildChannelKeyCandidates(...keys: Array<string | undefined | null>): string[];匹配优先级:direct(直接 Key 命中)> parent(父线程继承)> wildcard("*" 通配)
各频道模块自行调用 buildChannelKeyCandidates 构建格式化的候选 key 列表(如 telegram:bot123:group456、telegram:bot123 等),不再由 channel-config.ts 内部枚举 slug 模式。
2.4 命令门控(command-gating.ts)
控制命令授权
type CommandAuthorizer = {
kind: "owner" | "allowlist" | "access-group";
ownerNumbers?: string[];
allowFrom?: string[];
accessGroups?: string[];
};
function resolveControlCommandGate(params: {
cfg: OpenClawConfig;
channelMatch: ChannelEntryMatch;
senderE164?: string;
senderId?: string;
senderUsername?: string;
}): { authorized: boolean; reason?: string } {
const { cfg, channelMatch, senderE164, senderId, senderUsername } = params;
// 获取授权者列表
const authorizers = resolveCommandAuthorizers({
cfg,
channelMatch
});
// 检查授权
for (const auth of authorizers) {
if (auth.kind === "owner") {
if (senderE164 && auth.ownerNumbers?.includes(senderE164)) {
return { authorized: true };
}
} else if (auth.kind === "allowlist") {
if (matchesAllowFrom(auth.allowFrom, {
senderId,
senderUsername,
senderE164
})) {
return { authorized: true };
}
} else if (auth.kind === "access-group") {
// 访问组逻辑(复杂,涉及 security.accessGroups)
if (matchesAccessGroup(auth.accessGroups, { ... })) {
return { authorized: true };
}
}
}
return {
authorized: false,
reason: "Not authorized for control commands"
};
}2.5 提及门控(mention-gating.ts)
群组消息提及检测
// 实际函数签名 — 基于布尔参数,非模式枚举
type MentionGateParams = {
wasMentioned: boolean; // 平台原生提及检测
implicitMention?: boolean; // 隐式提及(如回复机器人)
shouldBypassMention?: boolean; // 绕过提及检查(如控制命令)
requireMention: boolean; // 配置:是否要求提及
canDetectMention: boolean; // 频道是否支持提及检测
};
type MentionGateResult = {
effectiveWasMentioned: boolean;
shouldSkip: boolean;
};
function resolveMentionGating(params: MentionGateParams): MentionGateResult {
const implicit = params.implicitMention === true;
const bypass = params.shouldBypassMention === true;
const effectiveWasMentioned = params.wasMentioned || implicit || bypass;
const shouldSkip = params.requireMention && params.canDetectMention && !effectiveWasMentioned;
return { effectiveWasMentioned, shouldSkip };
}
// 带绕过的完整版本
function resolveMentionGatingWithBypass(
params: MentionGateWithBypassParams
): MentionGateWithBypassResult;关键逻辑:
- •
effectiveWasMentioned=wasMentioned||implicitMention||shouldBypassMention - •
shouldSkip=requireMention&&canDetectMention&& !effectiveWasMentioned - • 注意:不是基于"模式"(off/bot-mention/name-mention/patterns)的设计
- • 具体的提及检测(@username、名称匹配等)由各频道适配器在上游完成

2.6 入站会话记录(session.ts)
记录入站消息
type InboundLastRouteUpdate = {
channel: string;
accountId: string;
senderId: string;
senderUsername?: string;
senderE164?: string;
threadId?: string;
timestamp: number;
};
function recordInboundSession(params: {
cfg: OpenClawConfig;
route: ResolvedAgentRoute;
senderId: string;
senderUsername?: string;
senderE164?: string;
threadId?: string;
}): void {
const { cfg, route, senderId, senderUsername, senderE164, threadId } = params;
const update: InboundLastRouteUpdate = {
channel: route.channel,
accountId: route.accountId,
senderId,
senderUsername,
senderE164,
threadId,
timestamp: Date.now()
};
// 持久化到配置(用于 "回复" 命令)
const key = `${route.channel}:${route.accountId}:${senderId}`;
cfg._runtime = cfg._runtime || {};
cfg._runtime.inboundSessions = cfg._runtime.inboundSessions || {};
cfg._runtime.inboundSessions[key] = update;
// 清理旧会话(超过 7 天)
const now = Date.now();
const MAX_AGE_MS = 7 * 24 * 60 * 60 * 1000;
for (const [k, v] of Object.entries(cfg._runtime.inboundSessions)) {
if (now - v.timestamp > MAX_AGE_MS) {
delete cfg._runtime.inboundSessions[k];
}
}
}2.7 入站防抖策略(inbound-debounce-policy.ts)
控制是否对普通文本消息启用防抖,防止用户在短时间内连续输入时触发多次 Agent 运行。
function shouldDebounceTextInbound(params: {
text: string | null | undefined;
cfg: OpenClawConfig;
hasMedia?: boolean;
commandOptions?: CommandNormalizeOptions;
allowDebounce?: boolean;
}): boolean;规则:allowDebounce === false、含媒体、或文本为控制命令 → 不防抖;其余文本消息 → 防抖。
2.8 运行状态机(run-state-machine.ts)
跟踪频道的 Agent 运行活跃状态,向外发布 busy / activeRuns / lastRunActivityAt,支持 Gateway 状态同步。
function createRunStateMachine(params: {
setStatus?: (patch: RunStateStatusPatch) => void;
abortSignal?: AbortSignal;
heartbeatMs?: number; // 默认 DEFAULT_RUN_ACTIVITY_HEARTBEAT_MS
now?: () => number;
}): {
isActive(): boolean;
onRunStart(): void;
onRunEnd(): void;
deactivate(): void;
};
type RunStateStatusPatch = {
activeRuns?: number;
busy?: boolean;
lastRunActivityAt?: number;
};- •
onRunStart()/onRunEnd()在每次 Agent 运行前后调用 - • 心跳定时器在有活跃 run 时定期 publish,保证远端状态不过期
- •
abortSignal触发时自动 deactivate 并清理定时器
2.9 草稿流控制(draft-stream-controls.ts + draft-stream-loop.ts)
支持平台级"草稿消息":在 Agent 生成过程中实时更新消息(Telegram edit、Discord edit 等),完成后转为最终消息。
// 节流循环:以 throttleMs 间隔 sendOrEditStreamMessage
function createDraftStreamLoop(params: {
throttleMs: number;
isStopped: () => boolean;
sendOrEditStreamMessage: (text: string) => Promise<boolean>;
}): DraftStreamLoop;
// 高层封装:可终结草稿 + 停止时清除
function createFinalizableDraftStreamControls(params: {
throttleMs: number;
isStopped: () => boolean;
isFinal: () => boolean;
markStopped: () => void;
markFinal: () => void;
sendOrEditStreamMessage: (text: string) => Promise<boolean>;
}): {
loop: DraftStreamLoop;
update(text: string): void; // 更新草稿内容
stop(): Promise<void>; // 标记终态,flush 最终文本
stopForClear(): Promise<void>; // 取消并等待 in-flight 完成
};2.10 线程绑定策略(thread-bindings-policy.ts)
控制 Agent 何时在平台原生线程(Discord Thread、Slack Thread 等)中自动创建回复,以及线程的生命周期。
// 核心类型
type ThreadBindingSpawnPolicy = "always" | "on-new-session" | "never";
type ThreadBindingLifecycleRecord = {
spawnedAt: number;
lastActiveAt: number;
expiresAt?: number;
idleTimeoutMs?: number;
};
// 解析是否启用线程绑定
function resolveThreadBindingsEnabled(params: {
cfg: OpenClawConfig;
channel: string;
accountId: string;
}): boolean;
// 解析线程生成策略
function resolveThreadBindingSpawnPolicy(params: {
cfg: OpenClawConfig;
channel: string;
accountId: string;
}): ThreadBindingSpawnPolicy;
// 解析闲置超时(毫秒)和最大存活时间
function resolveThreadBindingIdleTimeoutMs(...): number;
function resolveThreadBindingMaxAgeMs(...): number;默认值:DEFAULT_THREAD_BINDING_IDLE_HOURS = 24h,DEFAULT_THREAD_BINDING_MAX_AGE_HOURS = 168h (7天)。
2.11 频道模型覆盖(model-overrides.ts)
根据频道、账户、群组等上下文解析 model 覆盖,支持 cfg.channels.modelByChannel 配置。
type ChannelModelOverride = {
channel: string;
model: string;
matchKey?: string;
matchSource?: ChannelMatchSource;
};
function resolveChannelModelOverride(params: {
cfg: OpenClawConfig;
channel?: string;
groupId?: string;
parentSessionKey?: string;
accountId?: string;
}): ChannelModelOverride | null;2.12 状态反应控制器(status-reactions.ts)
在 Agent 运行期间通过 emoji 反应反映当前阶段(排队 → 思考 → 工具调用 → 完成/错误/stall)。
function createStatusReactionController(params: {
enabled: boolean;
adapter: StatusReactionAdapter; // { setReaction, removeReaction? }
initialEmoji: string;
emojis?: StatusReactionEmojis;
timing?: StatusReactionTiming;
onError?: (err: unknown) => void;
}): StatusReactionController;
type StatusReactionController = {
setQueued(): void; // 排队等待
setThinking(): void; // 模型思考中
setTool(toolName?: string): void; // 工具调用(含编码/Web 专属 emoji)
setCompacting(): void; // 会话压缩中
cancelPending(): void; // 取消待处理变更
setDone(): Promise<void>; // 完成
setError(): Promise<void>; // 出错
clear(): Promise<void>; // 清除所有反应
restoreInitial(): Promise<void>; // 恢复初始 emoji
};内置 stall 软/硬超时:超过 stallSoftMs 后切换为"卡住"emoji,超过 stallHardMs 再升级;每次阶段变更自动重置计时器。
2.13 传输层可靠性(transport/stall-watchdog.ts)
为传输层提供空闲/停滞检测,超过 timeoutMs 无活动则触发回调(如中止连接、重连等)。
function createArmableStallWatchdog(params: {
label: string;
timeoutMs: number;
checkIntervalMs?: number;
abortSignal?: AbortSignal;
onTimeout: (meta: { idleMs: number; timeoutMs: number }) => void;
}): ArmableStallWatchdog;
type ArmableStallWatchdog = {
arm(atMs?: number): void; // 启动监控
touch(atMs?: number): void; // 更新活动时间
disarm(): void; // 停止监控(不停计时器)
stop(): void; // 完全停止并清理
isArmed(): boolean;
};3. 平台实现(主要频道)

3.1 Telegram(telegram/bot.ts)
Bot 创建流程
function createTelegramBot(opts: TelegramBotOptions) {
// 1. 解析账户配置
const account = resolveTelegramAccount({
cfg: opts.config,
accountId: opts.accountId
});
// 2. 创建 Bot(grammy)
const bot = new Bot(opts.token, {
client: {
fetch: resolveTelegramFetch(opts.proxyFetch, {
network: account.config.network
}),
timeoutSeconds: account.config.timeoutSeconds
}
});
// 3. 应用限流中间件
bot.api.config.use(apiThrottler());
// 4. 应用顺序化中间件(防止并发冲突)
bot.use(sequentialize(getTelegramSequentialKey));
// 5. 注册全局错误处理
bot.catch((err) => {
runtime.error?.(danger(`telegram bot error: ${formatUncaughtError(err)}`));
});
// 6. 去重中间件(防止重复更新)
const recentUpdates = createTelegramUpdateDedupe();
const shouldSkipUpdate = (ctx) => {
const key = buildTelegramUpdateKey(ctx);
return recentUpdates.check(key);
};
// 7. 注册消息处理器
bot.on("message", async (ctx) => {
recordUpdateId(ctx);
if (shouldSkipUpdate(ctx)) return;
// 解析消息上下文
const messageContext = buildTelegramMessageContext(ctx, opts.config);
// Allowlist 检查
const allowlistDecision = resolveAllowlistDecision(messageContext);
if (!allowlistDecision.allowed) {
await ctx.react("❌");
return;
}
// 路由到 Agent
const route = resolveAgentRoute({
cfg: opts.config,
channel: "telegram",
accountId: opts.accountId,
peer: { kind: "user", id: ctx.from.id.toString() }
});
// 记录入站会话
recordInboundSession({
cfg: opts.config,
route,
senderId: ctx.from.id.toString(),
senderUsername: ctx.from.username
});
// 分发到自动回复
await dispatchAutoReply({
route,
text: ctx.message.text,
channel: "telegram",
...messageContext
});
});
// 8. 其他处理器(callback_query、inline_query 等)
bot.on("callback_query", async (ctx) => { ... });
bot.on("inline_query", async (ctx) => { ... });
return bot;
}消息分发逻辑
// telegram/bot-message-dispatch.ts
async function dispatchTelegramMessage(params: {
ctx: TelegramContext;
route: ResolvedAgentRoute;
config: OpenClawConfig;
messageText: string;
media?: MediaAttachment[];
}) {
const { ctx, route, config, messageText, media } = params;
// 1. 检测命令
const commandMatch = detectCommand(messageText);
if (commandMatch) {
// 命令门控
const gateResult = resolveControlCommandGate({
cfg: config,
channelMatch: resolveChannelEntryMatch({ ... }),
senderId: ctx.from.id.toString(),
senderUsername: ctx.from.username
});
if (!gateResult.authorized) {
await ctx.react("🚫");
return;
}
// 执行命令
return await executeCommand(commandMatch, { ... });
}
// 2. 自动回复
await replyWithAgent({
route,
prompt: messageText,
media,
onPartialReply: async (text) => {
// 流式回复(编辑消息)
await ctx.editMessageText(text);
},
onBlockReply: async (chunk) => {
// 分块回复(发送新消息)
await ctx.reply(chunk);
}
});
}3.2 Discord(discord/monitor.ts)
Gateway 监听(文件结构验证)
// discord/monitor.gateway.ts
function createDiscordMonitor(opts: DiscordMonitorOptions) {
const client = new Client({
intents: [
GatewayIntentBits.Guilds,
GatewayIntentBits.GuildMessages,
GatewayIntentBits.MessageContent,
GatewayIntentBits.DirectMessages
]
});
// 消息处理
client.on("messageCreate", async (message) => {
if (message.author.bot) return;
// 群组提及门控
if (message.guild) {
const shouldAccept = shouldAcceptGroupMessage({
mode: resolveChannelConfig(...)?.mentionGating ?? "bot-mention",
text: message.content,
botUsername: client.user?.username,
isMentioned: message.mentions.has(client.user!)
});
if (!shouldAccept) return;
}
// 路由
const route = resolveAgentRoute({
cfg: opts.config,
channel: "discord",
accountId: opts.accountId,
peer: message.guild
? null
: { kind: "user", id: message.author.id },
guildId: message.guild?.id
});
// 分发
await dispatchAutoReply({
route,
text: message.content,
channel: "discord",
messageId: message.id,
threadId: message.channelId,
senderId: message.author.id,
senderUsername: message.author.username
});
});
// 交互命令(Slash Commands)
client.on("interactionCreate", async (interaction) => {
if (!interaction.isCommand()) return;
// 处理 /status、/reset 等命令
await handleSlashCommand(interaction);
});
await client.login(opts.token);
return client;
}4. 数据流(实际架构)
4.1 完整消息流
用户消息
↓ (平台 API)
平台监听器 (Bot/Gateway)
↓
┌─────────────────────────────────────┐
│ 1. 消息解析 │
│ - 提取文本、媒体、发送者 │
│ - 检测群组/DM/线程 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 2. Allowlist 验证 │
│ - resolveAllowlistDecision() │
│ - 检查 allowFrom 列表 │
└─────────────────────────────────────┘
↓ (拒绝: ❌ 反应)
┌─────────────────────────────────────┐
│ 3. 提及门控(群组消息) │
│ - resolveMentionGating() │
│ - wasMentioned / implicitMention │
└─────────────────────────────────────┘
↓ (跳过: 不回复)
┌─────────────────────────────────────┐
│ 3.5 入站防抖(可选) │
│ - shouldDebounceTextInbound() │
│ - 普通文本防抖;命令/媒体跳过 │
└─────────────────────────────────────┘
↓ (防抖: 等待聚合)
┌─────────────────────────────────────┐
│ 4. 路由解析 │
│ - resolveAgentRoute() │
│ - tiers: peer > peer.parent > │
│ peer.wildcard > guild+roles > │
│ guild > team > account >channel │
│ - 构建 sessionKey + lastRoutePolicy│
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 5. 频道配置匹配 │
│ - resolveChannelEntryMatchWithFallback() │
│ - matchSource: direct/parent/wildcard │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 6. 命令检测 │
│ - detectCommand() │
│ - /status, /reset, /model, ... │
└─────────────────────────────────────┘
↓ (命令)
┌─────────────────────────────────────┐
│ 7. 命令门控 │
│ - resolveControlCommandGate() │
│ - 检查 owner/allowlist/access-group│
└─────────────────────────────────────┘
↓ (未授权: 🚫 反应)
┌─────────────────────────────────────┐
│ 8. 命令执行 │
│ - executeCommand() │
│ - 返回状态/配置信息 │
└─────────────────────────────────────┘
↓ (普通消息)
┌─────────────────────────────────────┐
│ 9. 入站会话记录 │
│ - recordInboundSession() │
│ - 保存 senderId、timestamp │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 10. 运行状态机通知 │
│ - runStateMachine.onRunStart() │
│ - 推送 busy=true / activeRuns++ │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 11. 自动回复分发 │
│ - dispatchAutoReply() │
│ - 调用 Agent Runtime │
│ - 状态反应控制器 (setThinking…) │
└─────────────────────────────────────┘
↓
Agent 运行 (见 Agent Runtime 模块)
↓
┌─────────────────────────────────────┐
│ 12. 响应发送 │
│ - sendTelegramMessage() / ... │
│ - 草稿流 / 分块 / 重试 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 13. 运行状态机通知 │
│ - runStateMachine.onRunEnd() │
│ - 推送 busy=false / activeRuns-- │
└─────────────────────────────────────┘
↓
用户收到回复
4.2 路由决策树
收到消息
↓
解析频道标识
channel = "discord"
accountId = "1234567890"
peer = { kind: "user", id: "user42" } // DM
parentPeer = { kind: "thread", id: "thread99" } // 线程父消息
guildId = "guild111"
memberRoleIds = ["role-admin"]
↓
tier 1 — 对等方精确绑定 (peer: user:user42)
匹配 → Agent: "personal-bot"
sessionKey: "personal-bot:discord:dm:user:user42"
lastRoutePolicy: "session"
↓ (无匹配)
tier 2 — 线程父消息绑定 (parentPeer: thread:thread99)
匹配 → Agent: "thread-agent"
sessionKey 继承父线程
↓ (无匹配)
tier 3 — 同类型对等方通配 (user:*)
匹配 → Agent: "user-wildcard"
↓ (无匹配)
tier 4 — Discord 服务器 + 角色绑定 (guild:guild111 + roles: role-admin)
匹配 → Agent: "admin-bot"
sessionKey: "admin-bot:discord:group:guild111"
↓ (无匹配)
tier 5 — Discord 服务器绑定 (guild:guild111, 无角色条件)
匹配 → Agent: "guild-default"
↓ (无匹配)
tier 6 — Slack 工作区绑定 (teamId)
不适用(非 Slack)
↓
tier 7 — 账户精确绑定 (accountId: "1234567890")
匹配 → Agent: "discord-acct"
sessionKey: "discord-acct:discord:dm:main"
lastRoutePolicy: "main"
↓ (无匹配)
tier 8 — 频道通配 (accountId: "*")
匹配 → Agent: "discord-global"
↓ (无匹配)
default — cfg.agent.id ?? "default"4.3 频道配置匹配流程
频道配置查找(示例:Discord 服务器中的消息)
channel = "discord"
accountId = "bot123"
guildId = "guild456"
↓
调用方构建候选 Key 列表(按优先级):
keys = [
"discord:bot123:guild456", // 精确
"discord:*:guild456", // 空间匹配
"discord:bot123:*", // 账户通配
"discord:bot123", // 账户
"discord:*", // 频道通配
"discord", // 频道
]
wildcardKey = "*"
↓
resolveChannelEntryMatchWithFallback({ entries: cfg.channels, keys, wildcardKey })
↓
┌─ 直接 key 命中?
│ Yes → matchSource: "direct",entry = 该配置
│
├─ 父线程 key 命中?(parentKeys,线程继承场景)
│ Yes → matchSource: "parent",entry = 父线程配置
│
├─ wildcardKey "*" 命中?
│ Yes → matchSource: "wildcard",entry = 全局配置
│
└─ 无匹配 → entry = undefined(使用全局默认配置)5. 关键实现细节(源码验证)
5.1 Telegram 更新去重
// telegram/bot.ts (约 107-200 行)
const recentUpdates = createTelegramUpdateDedupe();
let lastUpdateId = opts.updateOffset?.lastUpdateId ?? null;
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
// 1. Update ID 检查(顺序保证)
const updateId = resolveTelegramUpdateId(ctx);
if (typeof updateId === "number" && lastUpdateId !== null) {
if (updateId <= lastUpdateId) return true;
}
// 2. 内容去重(防止 Webhook 重复)
const key = buildTelegramUpdateKey(ctx);
const skipped = recentUpdates.check(key);
if (skipped && key && shouldLogVerbose()) {
logVerbose(`telegram dedupe: skipped ${key}`);
}
return skipped;
};
// 使用
bot.on("message", async (ctx) => {
recordUpdateId(ctx); // 记录 Update ID
if (shouldSkipUpdate(ctx)) return; // 去重检查
// 处理消息...
});去重键构建:
function buildTelegramUpdateKey(ctx: Context): string | null {
const update = ctx.update;
if ("message" in update && update.message) {
const msg = update.message;
return `msg:${msg.chat.id}:${msg.message_id}:${msg.date}`;
}
if ("callback_query" in update && update.callback_query) {
const cbq = update.callback_query;
return `cbq:${cbq.id}:${cbq.from.id}`;
}
return null;
}5.2 Discord 顺序化处理
// discord/monitor.gateway.ts
client.on("messageCreate", async (message) => {
// 防止并发冲突:同一频道消息串行处理
const channelKey = message.channelId;
await channelQueueManager.enqueue(channelKey, async () => {
await handleMessageCreate(message);
});
});
// 频道队列管理器
class ChannelQueueManager {
private queues = new Map<string, Promise<void>>();
async enqueue<T>(key: string, fn: () => Promise<T>): Promise<T> {
const existing = this.queues.get(key) ?? Promise.resolve();
const next = existing.then(() => fn());
this.queues.set(key, next.then(() => {}, () => {}));
return next;
}
}5.3 Allowlist 规范化匹配
// channels/allowlist-match.ts
function matchesAllowFrom(
allowFrom: string[] | undefined,
identity: {
senderId?: string;
senderUsername?: string;
senderE164?: string;
}
): boolean {
if (!allowFrom || allowFrom.length === 0) return true;
for (const entry of allowFrom) {
const normalized = entry.trim().toLowerCase();
// 1. 通配符
if (normalized === "*") return true;
// 2. ID 匹配
if (identity.senderId && normalized === identity.senderId.toLowerCase()) {
return true;
}
// 3. 用户名匹配(忽略 @ 前缀)
if (identity.senderUsername) {
const username = identity.senderUsername.toLowerCase();
if (normalized === username || normalized === `@${username}`) {
return true;
}
}
// 4. E.164 号码匹配
if (identity.senderE164) {
const e164 = identity.senderE164.replace(/\D/g, "");
if (normalized.replace(/\D/g, "") === e164) {
return true;
}
}
}
return false;
}5.4 DM 作用域(Identity Links)
// routing/session-key.ts
function buildAgentSessionKey(params: {
agentId: string;
channel: string;
peer?: RoutePeer | null;
dmScope?: "main" | "per-peer";
identityLinks?: IdentityLink[];
}): string {
const { agentId, channel, peer, dmScope = "main", identityLinks } = params;
if (peer && dmScope === "per-peer") {
// 身份关联:跨频道合并会话
// 例如: Telegram user:123 和 Discord user:456 是同一人
const linkedId = findLinkedIdentity(peer, identityLinks);
if (linkedId) {
return `${agentId}:dm:linked:${linkedId}`;
}
// 独立会话
return `${agentId}:${channel}:dm:${peer.kind}:${peer.id}`;
}
// 主会话:所有 DM 共享
return `${agentId}:${channel}:dm:main`;
}
function findLinkedIdentity(
peer: RoutePeer,
identityLinks?: IdentityLink[]
): string | null {
if (!identityLinks || identityLinks.length === 0) return null;
for (const link of identityLinks) {
for (const identity of link.identities) {
if (identity.channel === peer.kind && identity.id === peer.id) {
return link.linkId;
}
}
}
return null;
}配置示例:
// config.json
{
"session": {
"dmScope": "per-peer",
"identityLinks": [
{
"linkId": "user-alice",
"identities": [
{ "channel": "telegram", "id": "123456" },
{ "channel": "discord", "id": "789012" }
]
}
]
}
}5.5 消息分块(Telegram 字符限制)
// telegram/draft-chunking.ts
const TELEGRAM_MAX_MESSAGE_LENGTH = 4096;
function chunkTelegramMessage(text: string): string[] {
if (text.length <= TELEGRAM_MAX_MESSAGE_LENGTH) {
return [text];
}
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
let chunkEnd = TELEGRAM_MAX_MESSAGE_LENGTH;
if (remaining.length > TELEGRAM_MAX_MESSAGE_LENGTH) {
// 在段落边界分块
const lastNewline = remaining.lastIndexOf("\n\n", chunkEnd);
if (lastNewline > chunkEnd * 0.6) {
chunkEnd = lastNewline + 2;
} else {
// 在单行边界分块
const lastSingleNewline = remaining.lastIndexOf("\n", chunkEnd);
if (lastSingleNewline > chunkEnd * 0.6) {
chunkEnd = lastSingleNewline + 1;
}
}
}
const chunk = remaining.slice(0, chunkEnd).trim();
chunks.push(chunk);
remaining = remaining.slice(chunkEnd);
}
return chunks;
}5.6 输入状态指示(Typing)
// channels/typing.ts
class TypingIndicator {
private active = false;
private intervalId?: NodeJS.Timeout;
async start(params: {
channel: string;
chatId: string;
sendAction: () => Promise<void>;
}) {
if (this.active) return;
this.active = true;
// 立即发送
await params.sendAction();
// 每 5 秒重新发送(避免超时)
this.intervalId = setInterval(async () => {
if (this.active) {
await params.sendAction();
}
}, 5000);
}
stop() {
this.active = false;
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = undefined;
}
}
}
// 使用
const typing = new TypingIndicator();
await typing.start({
channel: "telegram",
chatId: ctx.chat.id.toString(),
sendAction: () => ctx.api.sendChatAction(ctx.chat.id, "typing")
});
// Agent 运行中...
await runEmbeddedPiAgent({ ... });
typing.stop();6. 扩展机制(插件频道)
6.1 插件频道注册
// channels/plugins/registry.ts
type ChannelPlugin = {
id: string;
meta: ChatChannelMeta;
createMonitor: (opts: MonitorOptions) => Promise<ChannelMonitor>;
send: (params: SendParams) => Promise<SendResult>;
};
const pluginChannels = new Map<string, ChannelPlugin>();
function registerChannelPlugin(plugin: ChannelPlugin) {
pluginChannels.set(plugin.id, plugin);
// 添加到频道注册表
CHANNEL_IDS.push(plugin.id);
CHAT_CHANNEL_META[plugin.id] = {
...plugin.meta,
isExtension: true
};
}
// 使用
import { createMatrixMonitor, sendMatrixMessage } from "@openclaw/matrix";
registerChannelPlugin({
id: "matrix",
meta: {
id: "matrix",
label: "Matrix",
emoji: "🔮",
aliases: ["mtx"],
docs: "https://docs.openclaw.ai/plugins/matrix"
},
createMonitor: createMatrixMonitor,
send: sendMatrixMessage
});6.2 插件消息分发
// auto-reply/dispatch.ts
async function dispatchAutoReply(params: {
route: ResolvedAgentRoute;
text: string;
channel: string;
...
}) {
const { route, text, channel } = params;
// 检查是否为插件频道
const plugin = pluginChannels.get(channel);
if (plugin) {
// 使用插件的发送函数
await plugin.send({
route,
text,
...params
});
return;
}
// 内置频道处理
if (channel === "telegram") {
await sendTelegramMessage({ ... });
} else if (channel === "discord") {
await sendDiscordMessage({ ... });
} else {
throw new Error(`Unknown channel: ${channel}`);
}
}7. 性能优化(实际策略)
7.1 消息队列(防止并发冲突)
Telegram 顺序化
// telegram/bot.ts
bot.use(sequentialize(getTelegramSequentialKey));
function getTelegramSequentialKey(ctx: Context): string {
// 每个聊天独立队列
return ctx.chat?.id.toString() ?? "";
}Discord 频道队列
// discord/monitor.gateway.ts
const channelQueues = new Map<string, Promise<void>>();
async function enqueueChannelMessage(
channelId: string,
handler: () => Promise<void>
): Promise<void> {
const existing = channelQueues.get(channelId) ?? Promise.resolve();
const next = existing.then(handler, handler);
channelQueues.set(channelId, next);
return next;
}7.2 Allowlist 缓存
// channels/allowlist-match.ts
const allowlistCache = new Map<string, AllowlistMatch>();
function resolveAllowlistDecision(params: {
senderId: string;
senderUsername?: string;
allowFrom: string[];
}): AllowlistMatch {
const cacheKey = `${params.senderId}:${params.allowFrom.join(",")}`;
// 缓存命中
const cached = allowlistCache.get(cacheKey);
if (cached && Date.now() - cached.timestamp < 60000) {
return cached;
}
// 计算匹配
const match = matchesAllowFrom(params.allowFrom, params);
const result = {
allowed: match,
source: match ? "allowlist" : null,
timestamp: Date.now()
};
allowlistCache.set(cacheKey, result);
return result;
}7.3 路由缓存
// routing/resolve-route.ts
const routeCache = new Map<string, ResolvedAgentRoute>();
function resolveAgentRouteWithCache(input: ResolveAgentRouteInput): ResolvedAgentRoute {
const cacheKey = buildRouteCacheKey(input);
// 缓存命中
const cached = routeCache.get(cacheKey);
if (cached) return cached;
// 计算路由
const route = resolveAgentRoute(input);
routeCache.set(cacheKey, route);
return route;
}
function buildRouteCacheKey(input: ResolveAgentRouteInput): string {
const parts = [
input.channel,
input.accountId ?? "",
input.peer ? `${input.peer.kind}:${input.peer.id}` : "",
input.guildId ?? "",
input.teamId ?? ""
];
return parts.join(":");
}8. 错误处理与恢复
8.1 网络错误重试(Telegram)
// telegram/network-errors.ts
async function sendWithRetry<T>(
fn: () => Promise<T>,
options: {
maxRetries?: number;
backoff?: number[];
} = {}
): Promise<T> {
const maxRetries = options.maxRetries ?? 3;
const backoff = options.backoff ?? [1000, 2000, 4000];
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
const isRetryable = isRetryableError(err);
if (!isRetryable || attempt === maxRetries - 1) {
throw err;
}
const delay = backoff[attempt] ?? backoff[backoff.length - 1];
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error("Unexpected retry loop exit");
}
function isRetryableError(err: unknown): boolean {
if (err instanceof GrammyError) {
const code = err.error_code;
// 重试:速率限制、超时、内部错误
return code === 429 || code === 504 || code === 500;
}
return false;
}8.2 速率限制处理
// telegram/bot.ts
bot.api.config.use(apiThrottler({
global: new Bottleneck({ reservoir: 30, reservoirRefreshAmount: 30, reservoirRefreshInterval: 1000 }),
group: new Bottleneck({ reservoir: 20, reservoirRefreshAmount: 20, reservoirRefreshInterval: 60000 }),
out: new Bottleneck({ reservoir: 1, reservoirRefreshAmount: 1, reservoirRefreshInterval: 1000 })
}));8.3 消息发送失败回退
// auto-reply/reply.ts
async function sendReplyWithFallback(params: {
route: ResolvedAgentRoute;
text: string;
channel: string;
}) {
try {
// 尝试发送主消息
await sendMessage(params);
} catch (err) {
// 发送失败:尝试简化回退
try {
await sendMessage({
...params,
text: "⚠️ 回复失败,请稍后重试",
formatting: "plain"
});
} catch {
// 完全失败:记录日志
logError(`Failed to send reply to ${params.route.sessionKey}`, err);
}
}
}文档版本: v1.0 (基于源码验证)\ 验证日期: 2025-04-10
夜雨聆风