乐于分享
好东西不私藏

【OpenClaw 架构设计】4-OpenClaw 处理管道

【OpenClaw 架构设计】4-OpenClaw 处理管道

1. 概述

OpenClaw 的处理管道是一个多阶段的处理流程,负责将用户消息从输入到输出的完整处理过程。处理管道采用模块化设计,每个阶段都有明确的职责和接口,支持灵活的扩展和定制。

2. 处理管道架构

2.1 整体架构图

2.2 核心组件

从源码结构来看,处理管道涉及以下核心组件:

// src/auto-reply/reply.ts - 自动回复引擎export class AutoReplyEngine {  async process(message: ChatMessage): Promise<ReplyResult>}// src/sessions/store.ts - 会话存储export class SessionStore {  async load(sessionKey: string): Promise<Session>  async save(sessionKey: string, session: Session): Promise<void>}// src/agents/pi-tools.ts - 工具执行器export class ToolExecutor {  async executeTools(toolCalls: ToolCall[]): Promise<ToolResult[]>}

3. 输入处理阶段

3.1 消息解析

消息解析是处理管道的第一步,负责将原始消息转换为统一格式:

// src/channels/chat-type.tsexport interface ChatMessage {  id: string  channel: ChatType  from: string  to: string  text?: string  media?: MediaPayload  timestamp: number  replyTo?: string  metadata?: Record<string, any>  isGroup?: boolean  groupId?: string}

解析流程:

  1. 提取基本信息
    消息 ID、发送者、接收者、时间戳
  2. 解析文本内容
    提取文本消息内容
  3. 解析媒体内容
    提取图片、音频、视频等媒体内容
  4. 解析元数据
    提取回复、引用、提及等元数据
  5. 确定消息类型
    判断是私聊还是群聊

3.2 媒体处理

如果消息包含媒体内容,进行媒体处理:

// src/media/pipeline.tsexport class MediaPipeline {  async process(media: MediaPayload): Promise<ProcessedMedia> {    // 1. 下载媒体文件    const buffer = await this.download(media.url)    // 2. 检查大小限制    if (buffer.length > this.maxSize) {      throw new Error('Media too large')    }    // 3. 转码/压缩    const processed = await this.transcode(buffer, media.type)    // 4. 转录(如果是音频/视频)    if (media.type === 'audio' || media.type === 'video') {      const transcript = await this.transcribe(processed)      return { ...processed, transcript }    }    // 5. 生成临时文件路径    const tempPath = await this.saveTempFile(processed)    return { ...processed, tempPath }  }}

媒体处理流程:

  1. 下载
    从渠道下载媒体文件
  2. 大小检查
    检查文件大小是否超过限制(默认 20MB)
  3. 转码
    根据需要转码或压缩
  4. 转录
    音频/视频文件进行语音转文字
  5. 临时文件管理
    保存到临时目录,处理完成后清理

3.3 格式转换

将不同渠道的消息格式转换为统一的内部格式:

// src/telegram/parser.tsexport class TelegramParser {  parse(ctx: Context): ChatMessage {    const msg = ctx.message    return {      id: msg.message_id.toString(),      channel: 'telegram',      from: msg.from.id.toString(),      to: ctx.me.id.toString(),      text: msg.text,      media: this.parseMedia(msg),      timestamp: msg.date * 1000,      replyTo: msg.reply_to_message?.message_id.toString(),      isGroup: msg.chat.type !== 'private',      groupId: msg.chat.id.toString(),      metadata: {        username: msg.from.username,        firstName: msg.from.first_name,        lastName: msg.from.last_name      }    }  }}

4. 身份验证阶段

4.1 DM 配对检查

检查发送者是否已配对:

// src/pairing/store.tsexport class PairingStore {  async checkPairing(channel: string, userId: string): Promise<PairingStatus> {    // 1. 检查是否已配对    const paired = await this.store.get(`${channel}:${userId}`)    if (paired) {      return { status: 'paired' }    }    // 2. 检查是否有待处理的配对请求    const pending = await this.getPendingPairing(channel, userId)    if (pending) {      return { status: 'pending', code: pending.code }    }    // 3. 创建新的配对请求    const code = this.generatePairingCode()    await this.createPendingPairing(channel, userId, code)    return { status: 'unpaired', code }  }}

配对流程:

  1. 检查白名单
    检查用户是否在白名单中
  2. 检查配对状态
    检查用户是否已配对
  3. 生成配对码
    如果未配对,生成配对码
  4. 发送配对请求
    向用户发送配对请求

4.2 白名单验证

验证用户是否在白名单中:

// src/security/allowlist.tsexport class AllowlistChecker {  async check(channel: string, userId: string): Promise<boolean> {    const config = await this.getConfig(channel)    // 1. 检查通配符    if (config.allowFrom.includes('*')) {      return true    }    // 2. 检查精确匹配    if (config.allowFrom.includes(userId)) {      return true    }    // 3. 检查正则表达式    for (const pattern of config.allowFromPatterns || []) {      if (new RegExp(pattern).test(userId)) {        return true      }    }    return false  }}

4.3 权限验证

验证用户的访问权限:

// src/security/permissions.tsexport class PermissionChecker {  async check(userId: string, action: string): Promise<boolean> {    // 1. 获取用户角色    const role = await this.getUserRole(userId)    // 2. 检查角色权限    const permissions = this.getRolePermissions(role)    // 3. 检查是否有执行操作的权限    return permissions.includes(action)  }}

5. 路由分发阶段

5.1 会话路由

确定消息应该路由到哪个会话:

// src/routing/session-key.tsexport class SessionKeyResolver {  resolve(message: ChatMessage): string {    // 1. 私聊消息    if (!message.isGroup) {      return 'main'    }    // 2. 群聊消息    if (message.isGroup) {      // 检查是否需要隔离      const config = this.getGroupConfig(message.groupId!)      if (config.isolated) {        return `group:${message.groupId}`      }      // 检查是否使用激活模式      if (config.activationMode === 'mention') {        // 检查是否提及了机器人        if (this.isMentioned(message)) {          return `group:${message.groupId}`        }        return 'inactive'      }      return 'main'    }    return 'main'  }}

会话路由规则:

  • 私聊
    使用 “main” 会话
  • 群聊(隔离模式)
    使用群组 ID 作为会话键
  • 群聊(提及模式)
    只有在被提及时才激活会话
  • 群聊(命令模式)
    只有命令消息才激活会话

5.2 代理选择

选择处理消息的 AI 代理:

// src/routing/router.tsexport class AgentRouter {  async select(message: ChatMessage): Promise<string> {    const config = await this.getConfig()    // 1. 检查广播组    if (this.isBroadcastGroup(message.groupId)) {      return this.getBroadcastAgents(message.groupId)    }    // 2. 检查渠道路由    const channelBinding = config.bindings.find(      b => b.channel === message.channel    )    if (channelBinding) {      return channelBinding.agentId    }    // 3. 检查用户路由    const userBinding = config.bindings.find(      b => b.user === message.from    )    if (userBinding) {      return userBinding.agentId    }    // 4. 检查群组路由    const groupBinding = config.bindings.find(      b => b.group === message.groupId    )    if (groupBinding) {      return groupBinding.agentId    }    // 5. 使用默认代理    return config.defaultAgentId || 'default'  }}

5.3 工作区确定

确定代理的工作区:

// src/agents/workspace.tsexport class WorkspaceResolver {  async resolve(agentId: string): Promise<string> {    const config = await this.getAgentConfig(agentId)    // 1. 使用配置的工作区    if (config.workspace) {      return config.workspace    }    // 2. 使用默认工作区    return this.getDefaultWorkspace()  }}

6. AI 处理阶段

6.1 模型调用

调用 AI 模型处理消息:

// src/agents/pi-embedded.tsexport class PiEmbeddedAgent {  async callModel(context: AgentContext): Promise<ModelResponse> {    // 1. 准备提示词    const prompt = this.buildPrompt(context)    // 2. 准备工具    const tools = this.prepareTools(context)    // 3. 调用模型    const response = await this.model.complete({      messages: prompt.messages,      tools: tools,      stream: true,      ...this.getModelOptions()    })    return response  }}

模型调用流程:

  1. 构建提示词
    根据会话历史和用户消息构建提示词
  2. 准备工具
    根据配置和权限准备可用工具
  3. 调用模型
    调用 AI 模型 API
  4. 处理响应
    处理模型的响应

6.2 工具执行

执行模型调用的工具:

// src/agents/pi-tools.tsexport class ToolExecutor {  async executeTools(toolCalls: ToolCall[], context: ToolContext): Promise<ToolResult[]> {    const results: ToolResult[] = []    for (const call of toolCalls) {      // 1. 检查工具权限      if (!this.hasPermission(call.name, context)) {        results.push({          toolCallId: call.id,          error: 'Permission denied'        })        continue      }      // 2. 发送工具开始事件      this.emit('tool.start', {        tool: call.name,        args: call.args,        timestamp: Date.now()      })      try {        // 3. 执行工具        const result = await this.executeTool(call.name, call.args, context)        // 4. 发送工具完成事件        this.emit('tool.end', {          tool: call.name,          result,          timestamp: Date.now()        })        results.push({          toolCallId: call.id,          result        })      } catch (error) {        // 5. 发送工具错误事件        this.emit('tool.error', {          tool: call.name,          error: error.message,          timestamp: Date.now()        })        results.push({          toolCallId: call.id,          error: error.message        })      }    }    return results  }}

工具执行流程:

  1. 权限检查
    检查是否有执行工具的权限
  2. 事件通知
    发送工具开始事件
  3. 工具执行
    执行工具逻辑
  4. 结果处理
    处理工具执行结果
  5. 事件通知
    发送工具完成或错误事件

6.3 流式输出

支持流式输出响应:

// src/auto-reply/chunk.tsexport class ChunkStreamer {  async *streamResponse(    response: AsyncIterable<ResponseChunk>  ): AsyncIterable<ReplyChunk> {    let buffer = ''    for await (const chunk of response) {      // 1. 处理文本块      if (chunk.type === 'text') {        buffer += chunk.content        // 检查是否达到分块大小        if (buffer.length >= this.chunkSize) {          yield { type: 'text', content: buffer }          buffer = ''        }      }      // 2. 处理推理块      if (chunk.type === 'reasoning') {        yield { type: 'reasoning', content: chunk.content }      }      // 3. 处理工具块      if (chunk.type === 'tool') {        yield { type: 'tool', name: chunk.name, args: chunk.args }      }    }    // 4. 发送剩余内容    if (buffer.length > 0) {      yield { type: 'text', content: buffer }    }  }}

7. 响应生成阶段

7.1 格式化

将响应格式化为目标渠道的格式:

// src/auto-reply/format.tsexport class ResponseFormatter {  async format(    response: AgentResponse,    channel: ChatType  ): Promise<FormattedMessage> {    switch (channel) {      case 'whatsapp':        return this.formatForWhatsApp(response)      case 'telegram':        return this.formatForTelegram(response)      case 'slack':        return this.formatForSlack(response)      case 'discord':        return this.formatForDiscord(response)      default:        return this.formatDefault(response)    }  }  private formatForWhatsApp(response: AgentResponse): FormattedMessage {    // WhatsApp 特定格式化    return {      type: 'text',      content: response.text,      // WhatsApp 支持 Markdown 转 WhatsApp 格式      formatting: this.convertMarkdownToWhatsApp(response.text)    }  }}

7.2 媒体渲染

如果响应包含媒体内容,进行渲染:

// src/auto-reply/media-renderer.tsexport class MediaRenderer {  async render(media: MediaPayload): Promise<RenderedMedia> {    // 1. 图片处理    if (media.type === 'image') {      return {        type: 'image',        url: await this.uploadImage(media.data),        caption: media.caption      }    }    // 2. 音频处理    if (media.type === 'audio') {      return {        type: 'audio',        url: await this.uploadAudio(media.data),        duration: await this.getAudioDuration(media.data)      }    }    // 3. 视频处理    if (media.type === 'video') {      return {        type: 'video',        url: await this.uploadVideo(media.data),        duration: await this.getVideoDuration(media.data),        thumbnail: await this.generateThumbnail(media.data)      }    }    // 4. 文档处理    if (media.type === 'document') {      return {        type: 'document',        url: await this.uploadDocument(media.data),        filename: media.filename      }    }  }}

7.3 工具摘要

生成工具执行的摘要:

// src/agents/tool-summaries.tsexport class ToolSummarizer {  summarize(toolResults: ToolResult[]): string {    const summaries: string[] = []    for (const result of toolResults) {      if (result.error) {        summaries.push(`❌ ${result.tool}: ${result.error}`)      } else {        summaries.push(`✅ ${result.tool}: ${this.truncate(result.result)}`)      }    }    return summaries.join('\n')  }  private truncate(result: any, maxLength: number = 100): string {    const str = JSON.stringify(result)    if (str.length <= maxLength) {      return str    }    return str.substring(0, maxLength) + '...'  }}

8. 输出分发阶段

8.1 渠道发送

将响应发送到目标渠道:

// src/channels/sender.tsexport class ChannelSender {  async send(    message: FormattedMessage,    channel: ChatType,    recipient: string  ): Promise<SendResult> {    // 1. 获取渠道发送器    const sender = this.getSender(channel)    // 2. 发送消息    const result = await sender.send(message, recipient)    // 3. 更新发送状态    await this.updateSendStatus(result)    return result  }}

8.2 状态更新

更新消息处理状态:

// src/status/tracker.tsexport class StatusTracker {  async update(messageId: string, status: MessageStatus): Promise<void> {    // 1. 更新数据库状态    await this.db.update(messageId, {      status,      updatedAt: Date.now()    })    // 2. 发送状态事件    this.emit('status.update', {      messageId,      status,      timestamp: Date.now()    })  }}

8.3 事件通知

发送事件通知:

// src/events/notifier.tsexport class EventNotifier {  async notify(event: ProcessingEvent): Promise<void> {    // 1. 记录事件日志    await this.logEvent(event)    // 2. 发送到事件总线    await this.eventBus.publish(event)    // 3. 触发 Webhook(如果配置)    if (this.webhookUrl) {      await this.triggerWebhook(event)    }  }}

9. 错误处理

9.1 错误类型

处理管道中可能出现的错误类型:

// src/errors/types.tsexport enum PipelineErrorType {  PARSE_ERROR = 'PARSE_ERROR',           // 消息解析错误  AUTH_ERROR = 'AUTH_ERROR',             // 身份验证错误  ROUTING_ERROR = 'ROUTING_ERROR',       // 路由错误  MODEL_ERROR = 'MODEL_ERROR',           // 模型调用错误  TOOL_ERROR = 'TOOL_ERROR',             // 工具执行错误  MEDIA_ERROR = 'MEDIA_ERROR',           // 媒体处理错误  NETWORK_ERROR = 'NETWORK_ERROR',       // 网络错误  TIMEOUT_ERROR = 'TIMEOUT_ERROR'        // 超时错误}

9.2 错误处理策略

// src/errors/handler.tsexport class ErrorHandler {  async handle(error: PipelineError): Promise<ErrorHandlerResult> {    // 1. 记录错误日志    this.logger.error('Pipeline error:', error)    // 2. 根据错误类型选择处理策略    switch (error.type) {      case PipelineErrorType.NETWORK_ERROR:        // 网络错误:重试        return this.retry(error)      case PipelineErrorType.TIMEOUT_ERROR:        // 超时错误:重试或降级        return this.retryOrFallback(error)      case PipelineErrorType.AUTH_ERROR:        // 认证错误:返回错误消息        return this.sendAuthError(error)      default:        // 其他错误:返回通用错误消息        return this.sendGenericError(error)    }  }}

10. 性能优化

10.1 缓存策略

// src/cache/manager.tsexport class CacheManager {  private cache: Map<string, CacheEntry> = new Map()  async get<T>(key: string): Promise<T | null> {    const entry = this.cache.get(key)    if (!entry) return null    // 检查是否过期    if (Date.now() > entry.expiry) {      this.cache.delete(key)      return null    }    return entry.value as T  }  async set<T>(key: string, value: T, ttl: number): Promise<void> {    this.cache.set(key, {      value,      expiry: Date.now() + ttl    })  }}

10.2 并发控制

// src/concurrency/limiter.tsexport class ConcurrencyLimiter {  private queue: Array<() => Promise<void>> = []  private active: number = 0  constructor(private maxConcurrent: number) {}  async run<T>(task: () => Promise<T>): Promise<T> {    return new Promise((resolve, reject) => {      const execute = async () => {        this.active++        try {          const result = await task()          resolve(result)        } catch (error) {          reject(error)        } finally {          this.active--          this.next()        }      }      if (this.active < this.maxConcurrent) {        execute()      } else {        this.queue.push(execute)      }    })  }  private next(): void {    if (this.queue.length > 0 && this.active < this.maxConcurrent) {      const task = this.queue.shift()      task?.()    }  }}

11. 监控与日志

11.1 性能监控

// src/monitoring/metrics.tsexport class PerformanceMonitor {  private metrics: Map<string, Metric> = new Map()  startTimer(name: string): () => void {    const start = Date.now()    return () => {      const duration = Date.now() - start      this.recordMetric(name, duration)    }  }  private recordMetric(name: string, value: number): void {    const metric = this.metrics.get(name) || {      count: 0,      sum: 0,      min: Infinity,      max: -Infinity    }    metric.count++    metric.sum += value    metric.min = Math.min(metric.min, value)    metric.max = Math.max(metric.max, value)    this.metrics.set(name, metric)  }  getStats(name: string): MetricStats | null {    const metric = this.metrics.get(name)    if (!metric) return null    return {      count: metric.count,      avg: metric.sum / metric.count,      min: metric.min,      max: metric.max    }  }}

11.2 日志记录

// src/logging/logger.tsexport class PipelineLogger {  constructor(private context: string) {}  info(message: string, data?: any): void {    this.log('INFO', message, data)  }  error(message: string, error?: Error): void {    this.log('ERROR', message, { error: error?.message, stack: error?.stack })  }  debug(message: string, data?: any): void {    if (process.env.DEBUG) {      this.log('DEBUG', message, data)    }  }  private log(level: string, message: string, data?: any): void {    console.log(JSON.stringify({      timestamp: new Date().toISOString(),      level,      context: this.context,      message,      data    }))  }}

12. 扩展性设计

12.1 插件系统

// src/plugins/interface.tsexport interface PipelinePlugin {  name: string  version: string  // 在处理管道的特定阶段注入  onBeforeProcess?(message: ChatMessage): Promise<ChatMessage>  onAfterProcess?(response: AgentResponse): Promise<AgentResponse>  onError?(error: PipelineError): Promise<void>}// src/plugins/manager.tsexport class PluginManager {  private plugins: Map<string, PipelinePlugin> = new Map()  register(plugin: PipelinePlugin): void {    this.plugins.set(plugin.name, plugin)  }  async executeHook<T>(    hookName: string,    data: T,    ...args: any[]  ): Promise<T> {    let result = data    for (const plugin of this.plugins.values()) {      const hook = (plugin as any)[hookName]      if (typeof hook === 'function') {        result = await hook.call(plugin, result, ...args)      }    }    return result  }}

12.2 中间件机制

// src/middleware/types.tsexport type Middleware = (  context: PipelineContext,  next: () => Promise<void>) => Promise<void>// src/middleware/stack.tsexport class MiddlewareStack {  private middlewares: Middleware[] = []  use(middleware: Middleware): void {    this.middlewares.push(middleware)  }  async execute(context: PipelineContext): Promise<void> {    let index = 0    const next = async (): Promise<void> {      if (index < this.middlewares.length) {        const middleware = this.middlewares[index++]        await middleware(context, next)      }    }    await next()  }}

13. 总结

OpenClaw 的处理管道采用了清晰的分层架构和模块化设计,具有以下特点:

  • 清晰的阶段划分
    从输入到输出经过多个明确的处理阶段
  • 灵活的路由机制
    支持多种渠道路由和会话管理策略
  • 强大的扩展能力
    通过插件和中间件机制支持功能扩展
  • 完善的错误处理
    针对不同类型的错误采用不同的处理策略
  • 性能优化
    通过缓存、并发控制等手段提升处理效率
  • 可观测性
    提供完善的监控和日志记录能力

这种架构设计使得 OpenClaw 能够灵活应对不同的业务场景,同时保持代码的可维护性和可扩展性。