OpenClaw 多 Agent 配置实战:从单兵到军团
当单个 Agent 力不从心时,如何让多个 Agent 协同作战?本文带你掌握 OpenClaw 的多 Agent orchestration 技术。
为什么需要多 Agent?
想象一下:你有一个复杂的任务——分析一份 100 页的市场报告,提取关键数据,生成图表,最后写成一篇公众号文章。单个 Agent 可能会:
上下文窗口爆炸 💥 推理链条断裂 某个环节出错导致全盘重来
多 Agent 架构的核心优势:
任务解耦 - 每个 Agent 专注一件事 并行加速 - 多个任务同时进行 容错隔离 - 单点失败不影响全局 能力扩展 - 不同 Agent 调用不同技能/模型
OpenClaw 多 Agent 架构概览
OpenClaw 提供了两种多 Agent 模式:
┌─────────────────────────────────────────────────────────┐
│ 主 Agent (你) │
├─────────────────────────────────────────────────────────┤
│ Mode 1: Sub-Agent (sessions_spawn + subagents) │
│ ├─ 轻量级任务委派 │
│ ├─ 自动继承工作目录 │
│ └─ 支持 push-based 完成通知 │
│ │
│ Mode 2: ACP Harness (Agent Coding Protocol) │
│ ├─ 代码密集型任务 │
│ ├─ 支持 Codex/Claude Code/Gemini 等主流工具 │
│ └─ Thread-bound 持久会话 │
└─────────────────────────────────────────────────────────┘
实战:从理论到代码
场景设定
假设我们要做一个「竞品监控机器人」:
数据收集 Agent - 每小时抓取竞品网站更新 分析 Agent - 提取价格、功能变化 报告 Agent - 生成 Markdown 报告 发布 Agent - 推送到飞书/钉钉
1. 使用 Sub-Agent 模式
# main_agent.py - 主控 Agent
import asyncio
async def competitor_monitor():
"""竞品监控主流程"""
# Step 1: 委派数据收集(并行启动多个)
collect_tasks = []
for competitor in ["竞对A", "竞对B", "竞对C"]:
task = await spawn_subagent(
task=f"抓取 {competitor} 官网的产品和价格信息",
label=f"collector-{competitor}",
timeout_seconds=300
)
collect_tasks.append(task)
# 等待所有收集完成
collected_data = await asyncio.gather(*collect_tasks)
# Step 2: 委派分析
analysis = await spawn_subagent(
task=f"分析以下竞品数据,提取价格变化和关键更新:\n{collected_data}",
label="analyzer",
timeout_seconds=180
)
# Step 3: 委派报告生成
report = await spawn_subagent(
task=f"基于分析结果生成 Markdown 报告:\n{analysis}",
label="reporter",
timeout_seconds=120
)
# Step 4: 本地执行发布(不需要子 Agent)
await publish_to_feishu(report)
return "监控任务完成"
OpenClaw 的 sessions_spawn 调用示例:
# 主 Agent 调用方式
sessions_spawn(
task="抓取竞对A官网数据",
label="collector-A",
runtime="subagent", # 轻量级子 Agent
mode="run", # one-shot 模式
timeout_seconds=300
)
2. 使用 ACP Harness 模式
对于需要复杂代码操作的场景(比如写爬虫、做数据分析),使用 ACP 模式调用专业工具:
# 数据清洗任务 - 需要写 Python 脚本
sessions_spawn(
task="""
写一个 Python 脚本,完成以下任务:
1. 读取 data/raw/competitor_prices.json
2. 清洗价格字段(去除货币符号,转为数值)
3. 按产品分类统计平均价格
4. 输出到 data/processed/price_analysis.csv
要求:使用 pandas,添加错误处理,代码要健壮。
""",
runtime="acp", # Agent Coding Protocol
agentId="claude-code", # 使用 Claude Code 工具
mode="run",
timeout_seconds=600
)
ACP 支持的工具:
| 工具 | Agent ID | 适用场景 |
|---|---|---|
| Claude Code | claude-code |
复杂重构、多文件编辑 |
| Codex | codex |
快速原型、功能实现 |
| Gemini CLI | gemini |
代码审查、文档生成 |
关键配置参数详解
sessions_spawn 参数表
| 参数 | 类型 | 说明 |
|---|---|---|
task |
string | 必填 - 任务描述,越详细越好 |
runtime |
enum | subagent (轻量) / acp (代码工具) |
agentId |
string | ACP 模式下指定工具 (claude-code/codex/gemini) |
mode |
enum | run (一次性) / session (持久会话) |
label |
string | 任务标签,用于日志和监控 |
timeoutSeconds |
number | 超时时间(秒) |
thread |
boolean | 是否绑定到 thread(Discord 等场景) |
cwd |
string | 工作目录(默认继承父级) |
attachments |
array | 附件列表(会被挂载到子 Agent) |
多 Agent 协调的关键技巧
1. 任务分解粒度
# ❌ 太粗 - 子 Agent 可能迷失
"分析竞品并生成报告"
# ✅ 适中 - 明确输入输出
"分析以下价格数据,输出3个关键发现:\n{data}"
2. 结果传递格式
# 使用结构化格式便于解析
output_format = """
请按以下 JSON 格式返回结果:
{
"summary": "执行摘要",
"details": {
"key1": "value1",
"key2": "value2"
},
"next_steps": ["建议1", "建议2"]
}
"""
3. 错误处理与重试
async def resilient_spawn(task, label, max_retries=3):
"""带重试的子 Agent 调用"""
for attempt in range(max_retries):
try:
result = await sessions_spawn(
task=task,
label=f"{label}-attempt-{attempt+1}",
timeout_seconds=180
)
return result
except TimeoutError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
else:
raise
常见问题与解决方案
Q1: 子 Agent 经常超时怎么办?
可能原因:
任务描述不够清晰,子 Agent 在探索 任务本身过于复杂 网络/API 调用慢
解决方案:
将大任务拆成 2-3 个小任务串行执行 在 task 中明确写出「期望输出格式」 增加 timeoutSeconds(默认 600 秒)
Q2: 子 Agent 结果质量不稳定?
解决方案:
Few-shot 示例 - 在 task 中给 1-2 个输入输出示例 约束条件 - 明确要求「不要XX,要YY」 检查点 - 关键节点让子 Agent 先输出大纲/计划,确认后再执行
Q3: 如何调试多 Agent 流程?
调试技巧:
# 开启详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
# 单步执行(用于调试)
async def debug_mode():
# 先只跑第一步
step1_result = await sessions_spawn(...)
print(f"Step 1 result: {step1_result}")
# 检查无误后再继续
input("Press Enter to continue...")
step2_result = await sessions_spawn(...)
高级应用:构建 Agent 集群
当任务复杂到一定程度,你需要一个「Agent 集群」——就像 Kubernetes 管理容器一样管理你的 Agents。
架构示例:智能客服系统
┌─────────────────────────────────────────────────────────┐
│ 调度器 Agent (Scheduler) │
│ - 接收用户消息,路由到对应 Agent │
└─────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│售前咨询 │ │技术支持 │ │投诉处理 │
│Agent │ │Agent │ │Agent │
│(FAQ+报价)│ │(诊断+方案)│ │(安抚+记录)│
└──────────┘ └──────────┘ └──────────┘
│ │ │
└───────────────┼───────────────┘
▼
┌──────────────────────┐
│ 质检/归档 Agent │
│ - 会话总结 │
│ - 满意度分析 │
│ - 生成工单 │
└──────────────────────┘
实现代码框架:
class AgentCluster:
"""Agent 集群管理器"""
def __init__(self):
self.agents = {
"scheduler": self.create_scheduler(),
"sales": self.create_sales_agent(),
"tech_support": self.create_tech_agent(),
"complaint": self.create_complaint_agent(),
"quality_check": self.create_qc_agent()
}
async def route(self, user_message: str, context: dict):
"""路由消息到合适的 Agent"""
# 调度器分析意图
routing = await sessions_spawn(
task=f"""
分析用户消息,决定路由到哪个 Agent。
可选:sales(售前), tech_support(技术支持), complaint(投诉)
用户消息:{user_message}
上下文:{context}
只返回 JSON:{{"target": "agent_name", "confidence": 0.95}}
""",
label="router",
timeout_seconds=30
)
target = json.loads(routing)["target"]
# 调用对应 Agent
result = await self.agents[target].handle(user_message, context)
# 质检 Agent 审核
approved = await self.agents["quality_check"].review(result)
return approved if approved else await self.reprocess(result)
写在最后
多 Agent 不是银弹,但它确实解决了单 Agent 的诸多限制。关键是:
任务要拆得合理 - 太细有 overhead,太粗失去意义 接口要定义清楚 - 输入输出格式、错误处理 要有监控和回退 - Agent 也会失败,要有兜底
OpenClaw 的多 Agent 架构给了你一个灵活的工具箱。从简单的 sessions_spawn 到复杂的集群管理,你可以根据场景选择合适的方案。
记住:架构服务于需求,不要为了多 Agent 而多 Agent。
推荐阅读:
OpenClaw 官方文档 - Multi-Agent 章节[1] Agent 设计模式[2] OpenClaw Discord 社区[3]
本文基于 OpenClaw 最新版本撰写,如有更新请以官方文档为准。
引用链接
[1]OpenClaw 官方文档 - Multi-Agent 章节: https://docs.openclaw.ai
[2]Agent 设计模式: https://github.com/openclaw/patterns
[3]OpenClaw Discord 社区: https://discord.com/invite/clawd
夜雨聆风