CoPaw源码解析
版本: 1.0.0.post3
分析日期: 2026-04-06
1. 项目概述
CoPaw (Co Personal Agent Workstation) 是一个运行在本地环境的个人 AI 助手,支持多渠道(钉钉、飞书、微信、Discord、Telegram 等)交互,具备技能扩展、定时任务、多 Agent 协作等能力。所有数据和任务运行在本地,无需第三方托管。
核心依赖:
-
• agentscope==1.0.18– AgentScope 运行时 -
• agentscope-runtime==1.1.3– AgentScope Runtime 引擎 -
• FastAPI+Uvicorn– Web 框架 -
• ReMe(reme-ai==0.3.1.8) – 记忆系统后端
2. 前后端架构设计
2.1 前端架构 (Console)
技术栈
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
目录结构
console/src/
├── App.tsx # 根组件 + 路由配置
├── main.tsx # 入口点
├── api/ # API 调用层
│ ├── config.ts # API 配置 (URL, Token 管理)
│ ├── request.ts # 请求封装 (Axios)
│ ├── authHeaders.ts # 认证头
│ └── modules/ # 按功能模块划分 API
│ ├── agent.ts # 单 Agent 操作
│ ├── agents.ts # 多 Agent 管理
│ ├── chat.ts # 聊天操作
│ ├── channel.ts # 渠道配置
│ └── ...
├── stores/ # Zustand 状态管理
│ └── agentStore.ts # Agent 选择和列表管理
├── pages/ # 页面组件
│ ├── Chat/ # 聊天页面
│ ├── Agent/ # Agent 配置页面
│ ├── Control/ # 控制面板
│ └── Settings/ # 设置页面
├── components/ # 通用组件
├── hooks/ # 自定义 Hooks
│ └── useAppMessage.ts # WebSocket 实时消息
└── locales/ # 国际化资源
AgentStore (Zustand)
interfaceAgentStore {
selectedAgent: string; // 当前选中 Agent
agents: AgentSummary[]; // Agent 列表
setSelectedAgent: (id: string) =>void;
setAgents: (agents: AgentSummary[]) =>void;
}
-
• 持久化策略: 使用 sessionStorage存储,支持错误恢复 -
• 命名空间: copaw-agent-storage
路由设计
/login -> LoginPage (认证页)
/console/* -> MainLayout (主布局)
├── /chat -> Chat 主聊天界面
├── /agent/tools -> Agent 工具配置
├── /agent/config -> Agent 配置
├── /control/sessions -> 会话管理
├── /control/channels -> 渠道管理
└── /settings/* -> 各类设置
与后端交互
-
• RESTful API 通过 /api/*前缀 -
• 认证: Authorization: Bearer <token>头 -
• 支持 SSE (Server-Sent Events) 流式响应
2.2 后端架构 (FastAPI)
技术栈
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
核心模块划分
src/copaw/app/
├── _app.py # FastAPI 应用主入口,生命周期管理
├── routers/ # API 路由层
│ ├── agent.py # Agent 核心 API
│ ├── agents.py # 多 Agent 管理 API
│ ├── chat.py # 聊天 API
│ ├── messages.py # 消息 API
│ ├── skills.py # 技能管理
│ ├── workspace.py # 工作区 API
│ ├── voice.py # 语音通道
│ └── ...
├── runner/ # Agent 运行时核心
│ ├── runner.py # AgentRunner (查询处理)
│ ├── manager.py # ChatManager
│ ├── session.py # SafeJSONSession
│ ├── task_tracker.py # TaskTracker
│ ├── command_dispatch.py # 命令分发
│ ├── daemon_commands.py # /daemon 命令处理
│ └── repo/ # 存储仓库
├── workspace/ # 工作区封装
│ ├── workspace.py # Workspace 主类
│ └── service_manager.py # 服务生命周期管理
├── multi_agent_manager.py # 多 Agent 管理器
├── agent_context.py # Agent 上下文工具
├── channels/ # 通信渠道
│ ├── registry.py # 渠道注册表
│ ├── console/ # Console 渠道
│ ├── discord/ # Discord 渠道
│ ├── dingtalk/ # 钉钉渠道
│ └── feishu/ # 飞书渠道
├── crons/ # 定时任务
├── mcp/ # MCP 客户端管理
└── approvals/ # 工具审批服务
FastAPI 应用生命周期
启动 (lifespan):
1. 迁移旧配置
2. 初始化 MultiAgentManager
3. 启动所有已配置的 Agent
4. 初始化 ProviderManager (模型提供商)
5. 初始化 LocalModelManager (本地模型)
6. 设置审批服务
关闭:
1. 关闭本地模型服务器
2. 停止 MultiAgentManager (所有 Agent)
3. Agent 运行时详解
3.1 核心执行流程
HTTP Request (或 SSE 连接)
│
▼
DynamicMultiAgentRunner.stream_query()
│ (从 X-Agent-Id header 获取 agent_id)
▼
MultiAgentManager.get_agent(agent_id)
│ (懒加载,返回 Workspace)
▼
Workspace.runner (AgentRunner)
│
▼
AgentRunner.query_handler()
│
├──► _resolve_pending_approval() (工具 Guard 审批)
│
├──► _is_command() 检测
│ │
│ ├──► run_command_path() (命令路径)
│ │ │
│ │ ├──► DaemonCommandHandlerMixin.handle_daemon_command()
│ │ │ - /daemon restart/reload-config/status/logs/approve
│ │ │
│ │ ├──► ControlCommandHandler
│ │ │ - /stop 等控制命令
│ │ │
│ │ └──► CommandHandler (轻量级内存)
│ │ - /compact, /new 等会话命令
│
└──► CoPawAgent(msgs)
│
├──► memory.load_session_state()
│
├──► stream_printing_messages(agent(msgs))
│ │
│ ├──► _reasoning() [ReAct 循环]
│ │ │
│ │ └──► ToolGuardMixin._acting() [工具拦截]
│ │
│ └──► _summarizing()
│
└──► session.save_session_state()
3.2 AgentRunner 类
文件: src/copaw/app/runner/runner.py
classAgentRunner(Runner):
"""核心查询处理器"""
agent_id: str# Agent 标识
workspace_dir: Path # 工作目录
memory_manager: BaseMemoryManager # 记忆管理器
_task_tracker: TaskTracker # 任务跟踪器
_mcp_manager: MCPClientManager # MCP 客户端管理器
async query_handler(msgs, request) -> AsyncIterator[(Msg, bool)]
"""处理 Agent 查询的核心方法"""
async _resolve_pending_approval() -> (Msg|None, bool, dict|None)
"""解析待处理的工具 Guard 审批"""
async init_handler() # 初始化 (加载 .env, 创建 Session)
async shutdown_handler() # 关闭处理
关键特性:
-
• 工具审批: 在执行工具前检查是否有待审批的请求 -
• 命令分发: 识别并处理 /daemon,/stop,/compact等命令 -
• 会话管理: 自动加载/保存会话状态到 JSON 文件 -
• 流式响应: 支持 SSE 流式传输
3.3 CoPawAgent 类
文件: src/copaw/agents/react_agent.py
classCoPawAgent(ToolGuardMixin, ReActAgent):
"""CoPaw Agent with integrated tools, skills, and memory management."""
# MRO: CoPawAgent -> ToolGuardMixin -> ReActAgent
# ToolGuardMixin 拦截 _acting 和 _reasoning 方法进行工具安全审查
核心组件:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
内置工具:
-
• execute_shell_command– 执行 Shell 命令 -
• read_file/write_file/edit_file– 文件操作 -
• grep_search/glob_search– 文件搜索 -
• browser_use– 浏览器自动化 -
• desktop_screenshot– 桌面截图 -
• view_image/view_video– 媒体查看 -
• get_current_time/set_user_timezone– 时间工具 -
• get_token_usage– Token 统计 -
• send_file_to_user– 文件发送 -
• memory_search– 记忆搜索
Hook 机制:
-
1. BootstrapHook (pre_reasoning): 检查 BOOTSTRAP.md 引导文件 -
2. MemoryCompactionHook (pre_reasoning): 上下文满时自动压缩
4. 记忆系统 (Memory)
4.1 架构概述
BaseMemoryManager (抽象基类)
│
└──► ReMeLightMemoryManager (当前实现)
│
└──► ReMeLight (reme-ai 库)
│
├──► InMemoryMemory (内存记忆)
├──► FileStore (文件存储)
│ ├── local backend
│ └── chroma backend
└──► EmbeddingModel (向量嵌入)
4.2 BaseMemoryManager 接口
文件: src/copaw/agents/memory/base_memory_manager.py
classBaseMemoryManager(ABC):
working_dir: str
agent_id: str
chat_model: ChatModelBase
formatter: FormatterBase
asyncdefstart() -> None
"""启动记忆管理器"""
asyncdefclose() -> bool
"""关闭记忆管理器"""
asyncdefcompact_memory(messages, previous_summary) -> str
"""上下文压缩 - 将对话历史压缩为摘要"""
asyncdefsummary_memory(messages) -> str
"""记忆摘要 - 生成全面摘要"""
asyncdefmemory_search(query, max_results, min_score) -> ToolResponse
"""向量 + 全文混合搜索"""
defget_in_memory_memory() -> InMemoryMemory
"""获取内存记忆对象"""
4.3 ReMeLightMemoryManager 实现
文件: src/copaw/agents/memory/reme_light_memory_manager.py
存储后端选择:
if backend_env == "auto":
if platform.system() == "Windows":
memory_backend = "local"
else:
try:
import chromadb
memory_backend = "chroma"# 向量数据库
except:
memory_backend = "local"
核心能力:
-
1. compact_memory(): 使用 LLM 压缩对话历史,支持返回压缩字典 -
2. summary_memory(): 使用 LLM + 文件工具生成全面摘要 -
3. memory_search(): 向量相似度 + 全文搜索混合
嵌入配置优先级:
config > env var (EMBEDDING_API_KEY / EMBEDDING_BASE_URL / EMBEDDING_MODEL_NAME)
4.4 上下文压缩流程
check_context() 检测上下文超限
│
▼
MemoryCompactionHook (pre_reasoning 钩子)
│
▼
compact_memory(messages)
│
├── 1. 使用 LLM 压缩对话历史
│
└── 2. 返回摘要字符串
│
▼
memory.state_dict() 保存到 Session
5. 状态管理
5.1 多 Agent 管理 (MultiAgentManager)
文件: src/copaw/app/multi_agent_manager.py
classMultiAgentManager:
"""多 Agent 工作区管理器"""
agents: Dict[str, Workspace] # agent_id -> Workspace
_lock: asyncio.Lock # 并发访问锁
_cleanup_tasks: Set[asyncio.Task] # 延迟清理任务
async get_agent(agent_id) -> Workspace
"""懒加载获取 Agent (首次访问时创建)"""
async reload_agent(agent_id) -> bool
"""零宕机重载 Agent"""
async stop_agent(agent_id) -> bool
async stop_all()
async start_all_configured_agents() -> dict[str, bool]
"""并发启动所有已配置的 Agent"""
懒加载机制:
-
• Workspace 仅在首次请求时创建 -
• 已加载的 Agent 缓存在内存中 -
• 使用 asyncio.Lock 保证线程安全
零宕机重载流程:
/daemon restart
│
▼
MultiAgentManager.reload_agent(agent_id)
│
├── 1. 创建新 Workspace 实例
│
├── 2. 原子替换 (swap with lock)
│ - 新实例立即接收请求
│
├── 3. 优雅关闭旧实例
│ - 如果有活跃任务: 延迟清理 (等待完成)
│ - 如果无活跃任务: 立即停止
│
└── 结果: 请求不间断,组件热重载
5.2 Workspace 封装
文件: src/copaw/app/workspace/workspace.py
classWorkspace:
"""单个 Agent 的完整运行时封装"""
# 服务组件 (通过 ServiceManager 管理)
runner: AgentRunner
memory_manager: BaseMemoryManager
mcp_manager: MCPClientManager
chat_manager: ChatManager
channel_manager: BaseChannelManager
cron_manager: CronManager
# 非服务
task_tracker: TaskTracker
async start() # 启动所有服务
async stop(final=True) # 停止所有服务
5.3 服务生命周期管理 (ServiceManager)
文件: src/copaw/app/workspace/service_manager.py
@dataclass
classServiceDescriptor:
name: str# 服务名称
service_class: type# 服务类
init_args: Callable# 初始化参数
post_init: Callable# 创建后钩子
start_method: str# 启动方法名
stop_method: str# 停止方法名
reusable: bool# 是否可复用 (热 reload)
priority: int# 启动优先级
classServiceManager:
"""统一的服务生命周期管理器"""
启动优先级:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
服务复用 (热重载):
-
• reusable=True的服务在重载时保留 -
• 通过 set_reusable_components()传递给新实例
5.4 会话状态管理 (SafeJSONSession)
文件: src/copaw/app/runner/session.py
classSafeJSONSession(SessionBase):
"""JSON 文件会话存储 (跨平台安全)"""
save_dir: str# 存储目录
_UNSAFE_FILENAME_RE = r'[\\/:*?"<>|]'# Windows 非法字符过滤
async save_session_state(session_id, user_id, **state_modules)
async load_session_state(session_id, user_id, **state_modules)
async update_session_state(session_id, key, value)
async get_session_state_dict(session_id, user_id)
文件名安全:
# 原始: "discord:dm:12345"
# 清洗后: "discord--dm--12345"
sanitize_filename(name) -> str
5.5 任务跟踪器 (TaskTracker)
文件: src/copaw/app/runner/task_tracker.py
classTaskTracker:
"""后台任务跟踪: 流式传输、重连、多订阅者"""
_runs: dict[str, _RunState] # run_key -> (task, queues, buffer)
async attach_or_start(run_key, payload, stream_fn)
"""附加到现有运行或启动新运行"""
async attach(run_key) -> Queue | None
"""附加到现有运行 (用于重连)"""
async request_stop(run_key) -> bool
"""请求停止运行"""
async wait_all_done(timeout=300) -> bool
"""等待所有任务完成"""
运行状态:
@dataclass
class_RunState:
task: asyncio.Future
queues: list[asyncio.Queue] # 多个订阅者
buffer: list[str] # SSE 事件缓冲
6. 命令分发系统
6.1 三层命令架构
用户输入
│
▼
_is_command(query) 检测
│
├─► parse_daemon_query()
│ │
│ └──► DaemonCommandHandlerMixin.handle_daemon_command()
│ - /daemon restart -> 零宕机重载
│ - /daemon status -> 状态查询
│ - /daemon logs -> 日志查看
│ - /daemon reload-config
│ - /daemon approve
│
├─► _is_control_command()
│ │
│ └──► control_commands.handle_control_command()
│ - /stop -> 停止任务
│
└─► _is_conversation_command()
│
└──► CommandHandler.handle_conversation_command()
- /compact -> 内存压缩
- /new -> 新会话
- /save -> 保存记忆
6.2 工具审批流程
工具调用请求
│
▼
ToolGuardMixin._acting() 拦截
│
▼
ApprovalService.get_pending_by_session()
│
├──► 超时 -> 返回超时拒绝消息
│
├──► 用户输入 "approve" -> 批准工具调用
│
└──► 其他输入 -> 拒绝工具调用
7. 渠道系统 (Channels)
7.1 渠道架构
BaseChannel (ABC)
│
├── ConsoleChannel # Web 控制台
├── DiscordChannel # Discord 集成
├── DingTalkChannel # 钉钉集成
├── FeishuChannel # 飞书集成
├── WeChatChannel # 微信集成
├── VoiceChannel # 语音渠道 (Twilio)
└── ...
7.2 内置渠道列表
BUILTIN_CHANNELS = {
"imessage", "discord", "dingtalk", "feishu",
"qq", "telegram", "mattermost", "mqtt",
"console", "matrix", "voice", "wecom",
"xiaoyi", "weixin"
}
7.3 渠道注册表
文件: src/copaw/app/channels/registry.py
-
• 支持内置渠道自动注册 -
• 支持自定义渠道 ( CUSTOM_CHANNELS_DIR/)
8. MCP 客户端管理
8.1 MCPClientManager
文件: src/copaw/app/mcp/manager.py
classMCPClientManager:
"""MCP 客户端生命周期管理 (支持热重载)"""
_clients: Dict[str, StatefulClient]
async init_from_config(config)
async replace_client(key, client_config) # 热替换
async get_clients() -> List[Client] # 每次查询获取最新
async close_all()
支持的传输类型:
-
• stdio– 标准输入输出 -
• http– HTTP 请求
9. 定时任务系统 (Crons)
9.1 CronManager
文件: src/copaw/app/crons/manager.py
classCronManager:
"""定时任务管理器"""
_repo: JsonJobRepository # 任务仓库
_scheduler: APScheduler # 调度器
_runner: AgentRunner # 执行器
async start()
async stop()
async add_job(job_id, schedule, func, *args)
async remove_job(job_id)
9.2 Heartbeat 心跳机制
classHeartbeatManager:
"""定时检查和摘要推送"""
async run_heartbeat(agent_id)
"""执行心跳任务"""
10. 安全机制
10.1 Tool Guard (工具守卫)
文件: src/copaw/agents/tool_guard_mixin.py
-
• 自动拦截危险 Shell 命令 ( rm -rf /, fork bombs, reverse shells 等) -
• 文件访问控制 (限制访问敏感路径) -
• 工具审批超时机制
10.2 Skill 安全扫描
-
• 安装前自动扫描 Skill 风险 -
• 检测提示注入、命令注入、硬编码密钥等
11. 部署架构
11.1 部署模式
|
|
|
|
|
pip install copaw && copaw init && copaw app |
|
|
curl -fsSL https://copaw.agentscope.io/install.sh | bash |
|
|
docker run -p 8088:8088 agentscope/copaw:latest |
|
|
|
11.2 Docker 部署
docker run -p 127.0.0.1:8088:8088 \
-v copaw-data:/app/working \
-v copaw-secrets:/app/working.secret \
agentscope/copaw:latest
11.3 数据目录结构
WORKING_DIR/
├── agent.json # Agent 配置
├── config.json # 全局配置
├── .env # 环境变量 (API Keys)
├── sessions/ # 会话存储
│ └── {user_id}_{session_id}.json
└── workspaces/
└── {agent_id}/
├── agent.json
├── jobs.json # 定时任务
└── ...
12. 技术选型总结
12.1 核心技术栈
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
12.2 设计模式
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
13. 关键文件索引
|
|
|
src/copaw/app/_app.py |
|
src/copaw/app/runner/runner.py |
|
src/copaw/app/runner/session.py |
|
src/copaw/app/runner/task_tracker.py |
|
src/copaw/app/runner/command_dispatch.py |
|
src/copaw/app/runner/daemon_commands.py |
|
src/copaw/app/multi_agent_manager.py |
|
src/copaw/app/workspace/workspace.py |
|
src/copaw/app/workspace/service_manager.py |
|
src/copaw/app/agent_context.py |
|
src/copaw/app/agent_config_watcher.py |
|
src/copaw/agents/react_agent.py |
|
src/copaw/agents/memory/reme_light_memory_manager.py |
|
src/copaw/agents/memory/base_memory_manager.py |
|
src/copaw/agents/tool_guard_mixin.py |
|
src/copaw/agents/hooks/memory_compaction.py |
|
console/src/stores/agentStore.ts |
|
console/src/App.tsx |
|
14. 总结
CoPaw 是一个设计精良的个人 AI 助手项目,其架构特点包括:
-
1. 模块化设计: Workspace 封装每个 Agent 的完整运行时,服务化管理组件生命周期 -
2. 多 Agent 支持: MultiAgentManager 实现懒加载和零宕机热重载 -
3. 记忆系统: 基于 ReMe 的混合向量 + 全文搜索,支持自动上下文压缩 -
4. 安全机制: ToolGuardMixin 拦截危险工具调用,SkillScanner 安全扫描 -
5. 多渠道集成: 模块化的渠道系统,易于扩展新渠道 -
6. 状态持久化: SafeJSONSession 跨平台安全的会话存储 -
7. 任务追踪: TaskTracker 支持 SSE 流式、重连、多订阅者
夜雨聆风