【OpenClaw 架构设计】4-OpenClaw 处理管道

1. 概述
OpenClaw 的处理管道是一个多阶段的处理流程,负责将用户消息从输入到输出的完整处理过程。处理管道采用模块化设计,每个阶段都有明确的职责和接口,支持灵活的扩展和定制。
2. 处理管道架构
2.1 整体架构图

2.2 核心组件
从源码结构来看,处理管道涉及以下核心组件:
// src/auto-reply/reply.ts - 自动回复引擎export class AutoReplyEngine { async process(message: ChatMessage): Promise<ReplyResult>}// src/sessions/store.ts - 会话存储export class SessionStore { async load(sessionKey: string): Promise<Session> async save(sessionKey: string, session: Session): Promise<void>}// src/agents/pi-tools.ts - 工具执行器export class ToolExecutor { async executeTools(toolCalls: ToolCall[]): Promise<ToolResult[]>}
3. 输入处理阶段
3.1 消息解析
消息解析是处理管道的第一步,负责将原始消息转换为统一格式:
// src/channels/chat-type.tsexport interface ChatMessage { id: string channel: ChatType from: string to: string text?: string media?: MediaPayload timestamp: number replyTo?: string metadata?: Record<string, any> isGroup?: boolean groupId?: string}
解析流程:
- 提取基本信息
消息 ID、发送者、接收者、时间戳 - 解析文本内容
提取文本消息内容 - 解析媒体内容
提取图片、音频、视频等媒体内容 - 解析元数据
提取回复、引用、提及等元数据 - 确定消息类型
判断是私聊还是群聊
3.2 媒体处理
如果消息包含媒体内容,进行媒体处理:
// src/media/pipeline.tsexport class MediaPipeline { async process(media: MediaPayload): Promise<ProcessedMedia> { // 1. 下载媒体文件 const buffer = await this.download(media.url) // 2. 检查大小限制 if (buffer.length > this.maxSize) { throw new Error('Media too large') } // 3. 转码/压缩 const processed = await this.transcode(buffer, media.type) // 4. 转录(如果是音频/视频) if (media.type === 'audio' || media.type === 'video') { const transcript = await this.transcribe(processed) return { ...processed, transcript } } // 5. 生成临时文件路径 const tempPath = await this.saveTempFile(processed) return { ...processed, tempPath } }}
媒体处理流程:
- 下载
从渠道下载媒体文件 - 大小检查
检查文件大小是否超过限制(默认 20MB) - 转码
根据需要转码或压缩 - 转录
音频/视频文件进行语音转文字 - 临时文件管理
保存到临时目录,处理完成后清理
3.3 格式转换
将不同渠道的消息格式转换为统一的内部格式:
// src/telegram/parser.tsexport class TelegramParser { parse(ctx: Context): ChatMessage { const msg = ctx.message return { id: msg.message_id.toString(), channel: 'telegram', from: msg.from.id.toString(), to: ctx.me.id.toString(), text: msg.text, media: this.parseMedia(msg), timestamp: msg.date * 1000, replyTo: msg.reply_to_message?.message_id.toString(), isGroup: msg.chat.type !== 'private', groupId: msg.chat.id.toString(), metadata: { username: msg.from.username, firstName: msg.from.first_name, lastName: msg.from.last_name } } }}
4. 身份验证阶段
4.1 DM 配对检查
检查发送者是否已配对:
// src/pairing/store.tsexport class PairingStore { async checkPairing(channel: string, userId: string): Promise<PairingStatus> { // 1. 检查是否已配对 const paired = await this.store.get(`${channel}:${userId}`) if (paired) { return { status: 'paired' } } // 2. 检查是否有待处理的配对请求 const pending = await this.getPendingPairing(channel, userId) if (pending) { return { status: 'pending', code: pending.code } } // 3. 创建新的配对请求 const code = this.generatePairingCode() await this.createPendingPairing(channel, userId, code) return { status: 'unpaired', code } }}
配对流程:
- 检查白名单
检查用户是否在白名单中 - 检查配对状态
检查用户是否已配对 - 生成配对码
如果未配对,生成配对码 - 发送配对请求
向用户发送配对请求
4.2 白名单验证
验证用户是否在白名单中:
// src/security/allowlist.tsexport class AllowlistChecker { async check(channel: string, userId: string): Promise<boolean> { const config = await this.getConfig(channel) // 1. 检查通配符 if (config.allowFrom.includes('*')) { return true } // 2. 检查精确匹配 if (config.allowFrom.includes(userId)) { return true } // 3. 检查正则表达式 for (const pattern of config.allowFromPatterns || []) { if (new RegExp(pattern).test(userId)) { return true } } return false }}
4.3 权限验证
验证用户的访问权限:
// src/security/permissions.tsexport class PermissionChecker { async check(userId: string, action: string): Promise<boolean> { // 1. 获取用户角色 const role = await this.getUserRole(userId) // 2. 检查角色权限 const permissions = this.getRolePermissions(role) // 3. 检查是否有执行操作的权限 return permissions.includes(action) }}
5. 路由分发阶段
5.1 会话路由
确定消息应该路由到哪个会话:
// src/routing/session-key.tsexport class SessionKeyResolver { resolve(message: ChatMessage): string { // 1. 私聊消息 if (!message.isGroup) { return 'main' } // 2. 群聊消息 if (message.isGroup) { // 检查是否需要隔离 const config = this.getGroupConfig(message.groupId!) if (config.isolated) { return `group:${message.groupId}` } // 检查是否使用激活模式 if (config.activationMode === 'mention') { // 检查是否提及了机器人 if (this.isMentioned(message)) { return `group:${message.groupId}` } return 'inactive' } return 'main' } return 'main' }}
会话路由规则:
- 私聊
使用 “main” 会话 - 群聊(隔离模式)
使用群组 ID 作为会话键 - 群聊(提及模式)
只有在被提及时才激活会话 - 群聊(命令模式)
只有命令消息才激活会话
5.2 代理选择
选择处理消息的 AI 代理:
// src/routing/router.tsexport class AgentRouter { async select(message: ChatMessage): Promise<string> { const config = await this.getConfig() // 1. 检查广播组 if (this.isBroadcastGroup(message.groupId)) { return this.getBroadcastAgents(message.groupId) } // 2. 检查渠道路由 const channelBinding = config.bindings.find( b => b.channel === message.channel ) if (channelBinding) { return channelBinding.agentId } // 3. 检查用户路由 const userBinding = config.bindings.find( b => b.user === message.from ) if (userBinding) { return userBinding.agentId } // 4. 检查群组路由 const groupBinding = config.bindings.find( b => b.group === message.groupId ) if (groupBinding) { return groupBinding.agentId } // 5. 使用默认代理 return config.defaultAgentId || 'default' }}
5.3 工作区确定
确定代理的工作区:
// src/agents/workspace.tsexport class WorkspaceResolver { async resolve(agentId: string): Promise<string> { const config = await this.getAgentConfig(agentId) // 1. 使用配置的工作区 if (config.workspace) { return config.workspace } // 2. 使用默认工作区 return this.getDefaultWorkspace() }}
6. AI 处理阶段
6.1 模型调用
调用 AI 模型处理消息:
// src/agents/pi-embedded.tsexport class PiEmbeddedAgent { async callModel(context: AgentContext): Promise<ModelResponse> { // 1. 准备提示词 const prompt = this.buildPrompt(context) // 2. 准备工具 const tools = this.prepareTools(context) // 3. 调用模型 const response = await this.model.complete({ messages: prompt.messages, tools: tools, stream: true, ...this.getModelOptions() }) return response }}
模型调用流程:
- 构建提示词
根据会话历史和用户消息构建提示词 - 准备工具
根据配置和权限准备可用工具 - 调用模型
调用 AI 模型 API - 处理响应
处理模型的响应
6.2 工具执行
执行模型调用的工具:
// src/agents/pi-tools.tsexport class ToolExecutor { async executeTools(toolCalls: ToolCall[], context: ToolContext): Promise<ToolResult[]> { const results: ToolResult[] = [] for (const call of toolCalls) { // 1. 检查工具权限 if (!this.hasPermission(call.name, context)) { results.push({ toolCallId: call.id, error: 'Permission denied' }) continue } // 2. 发送工具开始事件 this.emit('tool.start', { tool: call.name, args: call.args, timestamp: Date.now() }) try { // 3. 执行工具 const result = await this.executeTool(call.name, call.args, context) // 4. 发送工具完成事件 this.emit('tool.end', { tool: call.name, result, timestamp: Date.now() }) results.push({ toolCallId: call.id, result }) } catch (error) { // 5. 发送工具错误事件 this.emit('tool.error', { tool: call.name, error: error.message, timestamp: Date.now() }) results.push({ toolCallId: call.id, error: error.message }) } } return results }}
工具执行流程:
- 权限检查
检查是否有执行工具的权限 - 事件通知
发送工具开始事件 - 工具执行
执行工具逻辑 - 结果处理
处理工具执行结果 - 事件通知
发送工具完成或错误事件
6.3 流式输出
支持流式输出响应:
// src/auto-reply/chunk.tsexport class ChunkStreamer { async *streamResponse( response: AsyncIterable<ResponseChunk> ): AsyncIterable<ReplyChunk> { let buffer = '' for await (const chunk of response) { // 1. 处理文本块 if (chunk.type === 'text') { buffer += chunk.content // 检查是否达到分块大小 if (buffer.length >= this.chunkSize) { yield { type: 'text', content: buffer } buffer = '' } } // 2. 处理推理块 if (chunk.type === 'reasoning') { yield { type: 'reasoning', content: chunk.content } } // 3. 处理工具块 if (chunk.type === 'tool') { yield { type: 'tool', name: chunk.name, args: chunk.args } } } // 4. 发送剩余内容 if (buffer.length > 0) { yield { type: 'text', content: buffer } } }}
7. 响应生成阶段
7.1 格式化
将响应格式化为目标渠道的格式:
// src/auto-reply/format.tsexport class ResponseFormatter { async format( response: AgentResponse, channel: ChatType ): Promise<FormattedMessage> { switch (channel) { case 'whatsapp': return this.formatForWhatsApp(response) case 'telegram': return this.formatForTelegram(response) case 'slack': return this.formatForSlack(response) case 'discord': return this.formatForDiscord(response) default: return this.formatDefault(response) } } private formatForWhatsApp(response: AgentResponse): FormattedMessage { // WhatsApp 特定格式化 return { type: 'text', content: response.text, // WhatsApp 支持 Markdown 转 WhatsApp 格式 formatting: this.convertMarkdownToWhatsApp(response.text) } }}
7.2 媒体渲染
如果响应包含媒体内容,进行渲染:
// src/auto-reply/media-renderer.tsexport class MediaRenderer { async render(media: MediaPayload): Promise<RenderedMedia> { // 1. 图片处理 if (media.type === 'image') { return { type: 'image', url: await this.uploadImage(media.data), caption: media.caption } } // 2. 音频处理 if (media.type === 'audio') { return { type: 'audio', url: await this.uploadAudio(media.data), duration: await this.getAudioDuration(media.data) } } // 3. 视频处理 if (media.type === 'video') { return { type: 'video', url: await this.uploadVideo(media.data), duration: await this.getVideoDuration(media.data), thumbnail: await this.generateThumbnail(media.data) } } // 4. 文档处理 if (media.type === 'document') { return { type: 'document', url: await this.uploadDocument(media.data), filename: media.filename } } }}
7.3 工具摘要
生成工具执行的摘要:
// src/agents/tool-summaries.tsexport class ToolSummarizer { summarize(toolResults: ToolResult[]): string { const summaries: string[] = [] for (const result of toolResults) { if (result.error) { summaries.push(`❌ ${result.tool}: ${result.error}`) } else { summaries.push(`✅ ${result.tool}: ${this.truncate(result.result)}`) } } return summaries.join('\n') } private truncate(result: any, maxLength: number = 100): string { const str = JSON.stringify(result) if (str.length <= maxLength) { return str } return str.substring(0, maxLength) + '...' }}
8. 输出分发阶段
8.1 渠道发送
将响应发送到目标渠道:
// src/channels/sender.tsexport class ChannelSender { async send( message: FormattedMessage, channel: ChatType, recipient: string ): Promise<SendResult> { // 1. 获取渠道发送器 const sender = this.getSender(channel) // 2. 发送消息 const result = await sender.send(message, recipient) // 3. 更新发送状态 await this.updateSendStatus(result) return result }}
8.2 状态更新
更新消息处理状态:
// src/status/tracker.tsexport class StatusTracker { async update(messageId: string, status: MessageStatus): Promise<void> { // 1. 更新数据库状态 await this.db.update(messageId, { status, updatedAt: Date.now() }) // 2. 发送状态事件 this.emit('status.update', { messageId, status, timestamp: Date.now() }) }}
8.3 事件通知
发送事件通知:
// src/events/notifier.tsexport class EventNotifier { async notify(event: ProcessingEvent): Promise<void> { // 1. 记录事件日志 await this.logEvent(event) // 2. 发送到事件总线 await this.eventBus.publish(event) // 3. 触发 Webhook(如果配置) if (this.webhookUrl) { await this.triggerWebhook(event) } }}
9. 错误处理
9.1 错误类型
处理管道中可能出现的错误类型:
// src/errors/types.tsexport enum PipelineErrorType { PARSE_ERROR = 'PARSE_ERROR', // 消息解析错误 AUTH_ERROR = 'AUTH_ERROR', // 身份验证错误 ROUTING_ERROR = 'ROUTING_ERROR', // 路由错误 MODEL_ERROR = 'MODEL_ERROR', // 模型调用错误 TOOL_ERROR = 'TOOL_ERROR', // 工具执行错误 MEDIA_ERROR = 'MEDIA_ERROR', // 媒体处理错误 NETWORK_ERROR = 'NETWORK_ERROR', // 网络错误 TIMEOUT_ERROR = 'TIMEOUT_ERROR' // 超时错误}
9.2 错误处理策略
// src/errors/handler.tsexport class ErrorHandler { async handle(error: PipelineError): Promise<ErrorHandlerResult> { // 1. 记录错误日志 this.logger.error('Pipeline error:', error) // 2. 根据错误类型选择处理策略 switch (error.type) { case PipelineErrorType.NETWORK_ERROR: // 网络错误:重试 return this.retry(error) case PipelineErrorType.TIMEOUT_ERROR: // 超时错误:重试或降级 return this.retryOrFallback(error) case PipelineErrorType.AUTH_ERROR: // 认证错误:返回错误消息 return this.sendAuthError(error) default: // 其他错误:返回通用错误消息 return this.sendGenericError(error) } }}
10. 性能优化
10.1 缓存策略
// src/cache/manager.tsexport class CacheManager { private cache: Map<string, CacheEntry> = new Map() async get<T>(key: string): Promise<T | null> { const entry = this.cache.get(key) if (!entry) return null // 检查是否过期 if (Date.now() > entry.expiry) { this.cache.delete(key) return null } return entry.value as T } async set<T>(key: string, value: T, ttl: number): Promise<void> { this.cache.set(key, { value, expiry: Date.now() + ttl }) }}
10.2 并发控制
// src/concurrency/limiter.tsexport class ConcurrencyLimiter { private queue: Array<() => Promise<void>> = [] private active: number = 0 constructor(private maxConcurrent: number) {} async run<T>(task: () => Promise<T>): Promise<T> { return new Promise((resolve, reject) => { const execute = async () => { this.active++ try { const result = await task() resolve(result) } catch (error) { reject(error) } finally { this.active-- this.next() } } if (this.active < this.maxConcurrent) { execute() } else { this.queue.push(execute) } }) } private next(): void { if (this.queue.length > 0 && this.active < this.maxConcurrent) { const task = this.queue.shift() task?.() } }}
11. 监控与日志
11.1 性能监控
// src/monitoring/metrics.tsexport class PerformanceMonitor { private metrics: Map<string, Metric> = new Map() startTimer(name: string): () => void { const start = Date.now() return () => { const duration = Date.now() - start this.recordMetric(name, duration) } } private recordMetric(name: string, value: number): void { const metric = this.metrics.get(name) || { count: 0, sum: 0, min: Infinity, max: -Infinity } metric.count++ metric.sum += value metric.min = Math.min(metric.min, value) metric.max = Math.max(metric.max, value) this.metrics.set(name, metric) } getStats(name: string): MetricStats | null { const metric = this.metrics.get(name) if (!metric) return null return { count: metric.count, avg: metric.sum / metric.count, min: metric.min, max: metric.max } }}
11.2 日志记录
// src/logging/logger.tsexport class PipelineLogger { constructor(private context: string) {} info(message: string, data?: any): void { this.log('INFO', message, data) } error(message: string, error?: Error): void { this.log('ERROR', message, { error: error?.message, stack: error?.stack }) } debug(message: string, data?: any): void { if (process.env.DEBUG) { this.log('DEBUG', message, data) } } private log(level: string, message: string, data?: any): void { console.log(JSON.stringify({ timestamp: new Date().toISOString(), level, context: this.context, message, data })) }}
12. 扩展性设计
12.1 插件系统
// src/plugins/interface.tsexport interface PipelinePlugin { name: string version: string // 在处理管道的特定阶段注入 onBeforeProcess?(message: ChatMessage): Promise<ChatMessage> onAfterProcess?(response: AgentResponse): Promise<AgentResponse> onError?(error: PipelineError): Promise<void>}// src/plugins/manager.tsexport class PluginManager { private plugins: Map<string, PipelinePlugin> = new Map() register(plugin: PipelinePlugin): void { this.plugins.set(plugin.name, plugin) } async executeHook<T>( hookName: string, data: T, ...args: any[] ): Promise<T> { let result = data for (const plugin of this.plugins.values()) { const hook = (plugin as any)[hookName] if (typeof hook === 'function') { result = await hook.call(plugin, result, ...args) } } return result }}
12.2 中间件机制
// src/middleware/types.tsexport type Middleware = ( context: PipelineContext, next: () => Promise<void>) => Promise<void>// src/middleware/stack.tsexport class MiddlewareStack { private middlewares: Middleware[] = [] use(middleware: Middleware): void { this.middlewares.push(middleware) } async execute(context: PipelineContext): Promise<void> { let index = 0 const next = async (): Promise<void> { if (index < this.middlewares.length) { const middleware = this.middlewares[index++] await middleware(context, next) } } await next() }}
13. 总结
OpenClaw 的处理管道采用了清晰的分层架构和模块化设计,具有以下特点:
- 清晰的阶段划分
从输入到输出经过多个明确的处理阶段 - 灵活的路由机制
支持多种渠道路由和会话管理策略 - 强大的扩展能力
通过插件和中间件机制支持功能扩展 - 完善的错误处理
针对不同类型的错误采用不同的处理策略 - 性能优化
通过缓存、并发控制等手段提升处理效率 - 可观测性
提供完善的监控和日志记录能力
这种架构设计使得 OpenClaw 能够灵活应对不同的业务场景,同时保持代码的可维护性和可扩展性。

夜雨聆风