OpenClaw 深度系列 Day22:多Agent协同系统
从单体智能到智能团队
为什么单个Agent不够用?
当我们用AI处理复杂任务时,经常会遇到这样的困境:让一个Agent完成所有工作,它要么顾此失彼,要么输出质量参差不齐。这不是AI能力的问题,而是分工协作的缺失。
就像人类社会一样,专业的团队永远比全能的个人效率更高。一个研究团队会有专人负责调研、专人负责撰写、专人负责审核——AI世界同样如此。
本文将深入探讨多Agent协同系统的设计与实现,包括:
为什么需要多Agent协同 MCP协议详解 协同模式与通信机制 实战:三Agent协同写文章
第一部分:为什么需要多Agent协同?
1.1 单体Agent的局限性
让我们先分析单体Agent的问题:
单体Agent的问题:
├─ 能力边界:什么都做,什么都不精
├─ 上下文限制:长任务会丢失早期信息
├─ 错误放大:一个环节出错,全盘皆输
├─ 资源竞争:多个任务抢占同一Agent
└─ 难以扩展:加功能就是改代码
真实案例:
假设你要让一个Agent完成"写一篇技术文章"的任务:
它需要搜索最新资料 需要理解技术细节 需要撰写清晰易懂的文字 需要审核逻辑是否通顺 需要根据反馈反复修改
让一个Agent同时做好这些事?结果往往是:
搜索资料时忘记了写作目标 写作时遗漏了重要技术细节 审核时难以发现自己的错误
1.2 多Agent协同的优势
多Agent协同系统通过专业化分工解决这些问题:
多Agent协同优势:
├─ 专业化:每个Agent专注一件事
├─ 可扩展:新增Agent不影响现有逻辑
├─ 容错性:一个Agent失败不会导致全盘失败
├─ 可追踪:每个环节独立记录日志
└─ 资源优化:不同Agent用不同模型
对比表:
| 维度 | 单体Agent | 多Agent协同 |
|---|---|---|
| 任务处理 | 串行,容易遗忘 | 分工并行,各司其职 |
| 输出质量 | 参差不齐 | 专业且稳定 |
| 错误处理 | 难以定位 | 精确追踪到某个Agent |
| 扩展性 | 牵一发动全身 | 插件式新增 |
| 成本控制 | 所有任务用同一模型 | 简单任务用小模型 |
1.3 协同的三个层次
多Agent协同不是简单的"多个Agent一起干活",而是分为三个层次:
第一层:分工协同
最简单的协同模式——每个Agent负责不同任务。
# 分工协同示例
class DivisionCollaboration:
"""
分工协同:每个Agent负责特定任务
"""
def __init__(self):
self.researcher = ResearcherAgent() # 研究员
self.writer = WriterAgent() # 写作员
self.reviewer = ReviewerAgent() # 审核员
def process(self, task):
# 流水线式处理
materials = self.researcher.research(task)
draft = self.writer.write(task, materials)
feedback = self.reviewer.review(draft)
# 根据反馈修改
if feedback.needs_revision:
final = self.writer.revise(draft, feedback)
else:
final = draft
return final
特点:
简单直观,易于实现 适合稳定的、流程固定的任务 缺点是Agent之间缺乏动态协调
第二层:协作协同
Agent之间动态分配任务、相互补充。
# 协作协同示例
class Collaboration:
"""
协作协同:Agent之间动态配合
"""
def __init__(self):
self.agents = {
'researcher': ResearcherAgent(),
'writer': WriterAgent(),
'editor': EditorAgent(),
'fact_checker': FactCheckerAgent()
}
self.task_queue = TaskQueue()
self.message_bus = MessageBus()
def process(self, task):
# 动态分解任务
sub_tasks = self.decompose(task)
self.task_queue.add(sub_tasks)
# 并行处理
results = []
while not self.task_queue.empty():
task = self.task_queue.get()
agent = self.select_agent(task)
result = agent.execute(task)
results.append(result)
# 根据结果动态决定下一步
next_tasks = self.plan_next(task, result)
self.task_queue.add(next_tasks)
# 汇总结果
return self.aggregate(results)
特点:
更灵活,能处理复杂任务 需要任务调度逻辑 Agent之间有信息交换
第三层:竞争协同
多个Agent独立求解,然后选择最优解。
# 竞争协同示例
class CompetitiveCollaboration:
"""
竞争协同:多个Agent独立求解,选择最优
"""
def __init__(self, num_agents=3):
self.writers = [WriterAgent() for _ in range(num_agents)]
self.evaluator = EvaluatorAgent()
def process(self, task):
# 并行生成多个解
solutions = []
for writer in self.writers:
solution = writer.write(task)
solutions.append(solution)
# 评估选择
scores = []
for solution in solutions:
score = self.evaluator.evaluate(task, solution)
scores.append(score)
# 返回最优解
best_idx = scores.index(max(scores))
return solutions[best_idx]
特点:
适合需要"头脑风暴"的场景 成本较高(多个Agent做同样工作) 可以产生更多创意
1.4 OpenClaw中的多Agent实现
在OpenClaw中,多Agent协同通过SubAgent机制实现:
// OpenClaw 多Agent协同示例
const { sessions_spawn } = require('openclaw');
// 创建研究员Agent
const researcher = await sessions_spawn({
runtime: "subagent",
task: `
你是一个专业的技术研究员。
任务:搜索关于"多Agent系统"最新研究成果。
输出:整理成结构化的要点列表。
`,
label: "researcher"
});
// 创建写作员Agent
const writer = await sessions_spawn({
runtime: "subagent",
task: `
你是一个专业技术作家。
基于研究员提供的资料,撰写一篇技术文章。
要求:结构清晰、深入浅出、包含代码示例。
`,
label: "writer"
});
// 创建审核员Agent
const reviewer = await sessions_spawn({
runtime: "subagent",
task: `
你是一个严格的技术审核员。
审核文章质量,提出具体修改意见。
只返回修改建议,不要直接修改。
`,
label: "reviewer"
});
// 协同流程
async function collaborativeWrite(topic) {
// Step 1: 研究
const researchResult = await researcher.send(
`研究主题:${topic}`
);
// Step 2: 写作
const draft = await writer.send(
`基于以下研究资料撰写文章:\n${researchResult}`
);
// Step 3: 审核
const feedback = await reviewer.send(
`审核以下文章并提出修改建议:\n${draft}`
);
// Step 4: 修改
const final = await writer.send(
`根据审核意见修改文章:\n${feedback}`
);
return final;
}
第二部分:MCP协议详解
2.1 什么是MCP?
MCP(Multi-Agent Protocol) 是多Agent系统之间通信的标准协议。
MCP协议层级:
├─ 传输层(Transport):消息如何传递
├─ 序列化层(Serialization):数据如何编码
├─ 协议层(Protocol):消息格式和语义
└─ 应用层(Application):具体业务逻辑
2.2 MCP消息格式
// MCP消息结构
interface MCPMessage {
// 消息唯一标识
id: string;
// 消息类型
type: MessageType;
// 发送者
sender: AgentID;
// 接收者(可选,广播时为空)
receiver?: AgentID;
// 时间戳
timestamp: number;
// 消息内容
content: MessageContent;
// 元数据
metadata?: {
correlationId?: string; // 关联ID,用于追踪
replyTo?: string; // 回复的消息ID
priority?: 'low' | 'normal' | 'high';
ttl?: number; // 生存时间
};
}
// 消息类型
enum MessageType {
REQUEST = 'request', // 请求
RESPONSE = 'response', // 响应
NOTIFICATION = 'notification', // 通知
ERROR = 'error', // 错误
HEARTBEAT = 'heartbeat' // 心跳
}
// 消息内容
interface MessageContent {
// 操作类型
action: string;
// 操作参数
params: Record<string, any>;
// 结果数据
data?: any;
// 错误信息
error?: {
code: string;
message: string;
details?: any;
};
}
2.3 MCP传输层实现
// MCP传输层接口
interface Transport {
// 连接到消息总线
connect(): Promise<void>;
// 断开连接
disconnect(): Promise<void>;
// 发送消息
send(message: MCPMessage): Promise<void>;
// 订阅消息
subscribe(
filter: MessageFilter,
handler: (message: MCPMessage) => void
): Subscription;
// 取消订阅
unsubscribe(subscription: Subscription): void;
}
// 消息过滤器
interface MessageFilter {
type?: MessageType[];
sender?: AgentID[];
receiver?: AgentID[];
action?: string[];
}
// WebSocket传输实现
class WebSocketTransport implements Transport {
private ws: WebSocket;
private subscriptions: Map<string, MessageFilter>;
private handlers: Map<string, (msg: MCPMessage) => void>;
async connect(endpoint: string): Promise<void> {
this.ws = new WebSocket(endpoint);
this.ws.on('message', (data) => {
const message = JSON.parse(data.toString());
this.dispatch(message);
});
return new Promise((resolve, reject) => {
this.ws.on('open', resolve);
this.ws.on('error', reject);
});
}
async send(message: MCPMessage): Promise<void> {
if (this.ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket not connected');
}
this.ws.send(JSON.stringify(message));
}
subscribe(
filter: MessageFilter,
handler: (message: MCPMessage) => void
): Subscription {
const id = uuid();
this.subscriptions.set(id, filter);
this.handlers.set(id, handler);
return { id };
}
private dispatch(message: MCPMessage): void {
for (const [id, filter] of this.subscriptions) {
if (this.matchFilter(message, filter)) {
this.handlers.get(id)(message);
}
}
}
private matchFilter(message: MCPMessage, filter: MessageFilter): boolean {
if (filter.type && !filter.type.includes(message.type)) return false;
if (filter.sender && !filter.sender.includes(message.sender)) return false;
if (filter.receiver && !filter.receiver.includes(message.receiver)) return false;
if (filter.action && !filter.action.includes(message.content.action)) return false;
return true;
}
}
2.4 Agent间通信示例
// Agent通信示例
class AgentCommunication {
private transport: Transport;
private agentId: AgentID;
constructor(agentId: AgentID, transport: Transport) {
this.agentId = agentId;
this.transport = transport;
}
// 发送请求
async request(
receiver: AgentID,
action: string,
params: Record<string, any>
): Promise<MCPMessage> {
const message: MCPMessage = {
id: uuid(),
type: MessageType.REQUEST,
sender: this.agentId,
receiver: receiver,
timestamp: Date.now(),
content: {
action,
params
}
};
// 发送消息
await this.transport.send(message);
// 等待响应
return this.waitForResponse(message.id);
}
// 广播通知
async notify(action: string, data: any): Promise<void> {
const message: MCPMessage = {
id: uuid(),
type: MessageType.NOTIFICATION,
sender: this.agentId,
timestamp: Date.now(),
content: {
action,
params: {},
data
}
};
await this.transport.send(message);
}
// 订阅消息
subscribe(
handler: (message: MCPMessage) => void,
filter?: MessageFilter
): Subscription {
return this.transport.subscribe(
{ ...filter, receiver: this.agentId },
handler
);
}
// 等待响应
private waitForResponse(requestId: string): Promise<MCPMessage> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Request ${requestId} timeout`));
}, 30000);
const unsubscribe = this.transport.subscribe(
{
type: [MessageType.RESPONSE, MessageType.ERROR],
metadata: { correlationId: requestId }
},
(message) => {
clearTimeout(timeout);
unsubscribe();
resolve(message);
}
);
});
}
}
2.5 错误处理与重试
// 消息重试机制
class RetryableTransport implements Transport {
private transport: Transport;
private maxRetries: number;
private retryDelay: number;
constructor(
transport: Transport,
maxRetries = 3,
retryDelay = 1000
) {
this.transport = transport;
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
}
async send(message: MCPMessage): Promise<void> {
let lastError: Error;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
try {
await this.transport.send(message);
return;
} catch (error) {
lastError = error;
// 指数退避
const delay = this.retryDelay * Math.pow(2, attempt);
await this.sleep(delay);
}
}
throw lastError;
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 错误类型定义
enum ErrorCode {
TIMEOUT = 'TIMEOUT',
CONNECTION_FAILED = 'CONNECTION_FAILED',
AGENT_NOT_FOUND = 'AGENT_NOT_FOUND',
INVALID_MESSAGE = 'INVALID_MESSAGE',
ACTION_NOT_SUPPORTED = 'ACTION_NOT_SUPPORTED',
INTERNAL_ERROR = 'INTERNAL_ERROR'
}
// 创建错误响应
function createErrorResponse(
originalMessage: MCPMessage,
code: ErrorCode,
message: string,
details?: any
): MCPMessage {
return {
id: uuid(),
type: MessageType.ERROR,
sender: originalMessage.receiver,
receiver: originalMessage.sender,
timestamp: Date.now(),
content: {
action: 'error',
params: {},
error: {
code,
message,
details
}
},
metadata: {
correlationId: originalMessage.id
}
};
}
第三部分:协同模式深度解析
3.1 任务分解模式
树形分解
树形分解:将大任务分解为树形结构的子任务
[根任务]
│
┌───────────┼───────────┐
│ │ │
[子任务1] [子任务2] [子任务3]
│ │ │
┌──┴──┐ ┌──┴──┐ │
│ │ │ │ │
[孙1] [孙2] [孙3] [孙4] -
# 树形分解实现
class TreeDecomposer:
def __init__(self, max_depth=5, max_children=5):
self.max_depth = max_depth
self.max_children = max_children
def decompose(self, task, depth=0):
"""递归分解任务"""
node = TaskNode(task)
# 达到最大深度,不再分解
if depth >= self.max_depth:
return node
# 判断任务是否需要分解
if self.should_decompose(task):
subtasks = self.split_task(task)
for subtask in subtasks[:self.max_children]:
child = self.decompose(subtask, depth + 1)
node.add_child(child)
return node
def should_decompose(self, task):
"""判断任务是否需要继续分解"""
# 任务复杂度评估
complexity = self.estimate_complexity(task)
return complexity > 5
def split_task(self, task):
"""拆分任务"""
# 基于关键词或语义分割
parts = self.semantic_split(task)
return parts
def estimate_complexity(self, task):
"""评估任务复杂度"""
# 简单实现:基于字数和关键词
word_count = len(task.split())
keywords = len(self.extract_keywords(task))
return (word_count / 100) + (keywords * 2)
def semantic_split(self, task):
"""语义分割任务"""
# 这里可以使用LLM来分解
prompt = f"""
将以下任务分解为2-5个子任务:
任务:{task}
要求:
1. 每个子任务应该独立可执行
2. 子任务之间有清晰的依赖关系
3. 使用JSON格式返回:{{"subtasks": ["任务1", "任务2", ...]}}
"""
# 调用LLM进行分解
response = llm.complete(prompt)
return json.loads(response)['subtasks']
有向无环图(DAG)分解
# DAG分解:支持更复杂的依赖关系
class DAGDecomposer:
def __init__(self):
self.tasks = {}
self.edges = [] # (parent_id, child_id)
def add_task(self, task_id, task):
"""添加任务节点"""
self.tasks[task_id] = {
'task': task,
'status': 'pending',
'result': None
}
def add_dependency(self, parent_id, child_id):
"""添加依赖关系"""
self.edges.append((parent_id, child_id))
def get_executable_tasks(self):
"""获取可执行的任务(所有依赖都已完成)"""
completed = {
tid for tid, info in self.tasks.items()
if info['status'] == 'completed'
}
executable = []
for task_id, task in self.tasks.items():
if task['status'] != 'pending':
continue
# 检查所有父任务是否已完成
parents = self.get_parents(task_id)
if all(p in completed for p in parents):
executable.append(task_id)
return executable
def get_parents(self, task_id):
"""获取任务的父任务"""
return [p for p, c in self.edges if c == task_id]
def get_children(self, task_id):
"""获取任务的子任务"""
return [c for p, c in self.edges if p == task_id]
def execute(self, executor_fn):
"""执行DAG任务"""
while True:
executable = self.get_executable_tasks()
if not executable:
break
for task_id in executable:
task = self.tasks[task_id]
print(f"执行任务: {task_id}")
# 执行任务
result = executor_fn(task['task'])
task['result'] = result
task['status'] = 'completed'
return {tid: info['result'] for tid, info in self.tasks.items()}
3.2 状态共享模式
共享内存
# 共享内存实现
import threading
from typing import Any, Dict
import time
class SharedMemory:
"""线程安全的共享内存"""
def __init__(self):
self._data: Dict[str, Any] = {}
self._locks: Dict[str, threading.Lock] = {}
self._global_lock = threading.RLock()
self._version: Dict[str, int] = {}
def set(self, key: str, value: Any, ttl: int = None):
"""设置值"""
with self._global_lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
with self._locks[key]:
self._data[key] = value
self._version[key] = self._version.get(key, 0) + 1
if ttl:
# 设置过期时间
threading.Timer(ttl, self._expire, args=[key]).start()
def get(self, key: str, default=None):
"""获取值"""
with self._global_lock:
if key not in self._locks:
return default
with self._locks[key]:
return self._data.get(key, default)
def update(self, key: str, updater: callable):
"""原子更新"""
with self._global_lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
with self._locks[key]:
old_value = self._data.get(key)
new_value = updater(old_value)
self._data[key] = new_value
self._version[key] = self._version.get(key, 0) + 1
return new_value
def compare_and_set(self, key: str, expected, new_value):
"""CAS操作:比较并设置"""
with self._global_lock:
if key not in self._locks:
self._locks[key] = threading.Lock()
with self._locks[key]:
current = self._data.get(key)
if current == expected:
self._data[key] = new_value
self._version[key] = self._version.get(key, 0) + 1
return True
return False
def _expire(self, key: str):
"""过期处理"""
with self._global_lock:
if key in self._data:
del self._data[key]
if key in self._locks:
del self._locks[key]
发布-订阅模式
# 发布-订阅实现
from typing import Callable, Dict, List
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
TASK_CREATED = 'task_created'
TASK_COMPLETED = 'task_completed'
TASK_FAILED = 'task_failed'
AGENT_STATE_CHANGED = 'agent_state_changed'
RESOURCE_UPDATED = 'resource_updated'
@dataclass
class Event:
type: EventType
source: str
data: Dict
timestamp: float
class PubSub:
"""发布-订阅系统"""
def __init__(self):
self._subscribers: Dict[EventType, List[Callable]] = {}
self._event_history: List[Event] = []
self._lock = threading.Lock()
def subscribe(self, event_type: EventType, handler: Callable):
"""订阅事件"""
with self._lock:
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(handler)
def unsubscribe(self, event_type: EventType, handler: Callable):
"""取消订阅"""
with self._lock:
if event_type in self._subscribers:
self._subscribers[event_type].remove(handler)
def publish(self, event: Event):
"""发布事件"""
with self._lock:
self._event_history.append(event)
if event.type in self._subscribers:
for handler in self._subscribers[event.type]:
try:
handler(event)
except Exception as e:
print(f"事件处理错误: {e}")
def get_history(
self,
event_type: EventType = None,
since: float = None
) -> List[Event]:
"""获取事件历史"""
with self._lock:
events = self._event_history
if event_type:
events = [e for e in events if e.type == event_type]
if since:
events = [e for e in events if e.timestamp >= since]
return events
# 使用示例
class AgentStateManager:
def __init__(self, pubsub: PubSub, shared_memory: SharedMemory):
self.pubsub = pubsub
self.memory = shared_memory
self._setup_subscriptions()
def _setup_subscriptions(self):
"""设置事件订阅"""
self.pubsub.subscribe(
EventType.TASK_COMPLETED,
self.on_task_completed
)
self.pubsub.subscribe(
EventType.RESOURCE_UPDATED,
self.on_resource_updated
)
def update_agent_state(self, agent_id: str, state: str):
"""更新Agent状态"""
self.memory.set(f"agent:{agent_id}:state", state)
self.pubsub.publish(Event(
type=EventType.AGENT_STATE_CHANGED,
source=agent_id,
data={'state': state},
timestamp=time.time()
))
def on_task_completed(self, event: Event):
"""任务完成处理"""
print(f"Agent {event.source} completed task: {event.data}")
def on_resource_updated(self, event: Event):
"""资源更新处理"""
print(f"Resource updated: {event.data}")
3.3 共识机制
当多个Agent需要做出一致决策时,需要共识机制。
# 简单共识机制
class Consensus:
"""Agent共识系统"""
def __init__(self, agents: List[str], threshold: float = 0.5):
self.agents = agents
self.threshold = threshold
async def reach_consensus(
self,
proposal: Any,
voters: List[str] = None
) -> Decision:
"""
达成共识
"""
voters = voters or self.agents
votes = []
# 收集投票
for agent_id in voters:
vote = await self.request_vote(agent_id, proposal)
votes.append(vote)
# 统计结果
agree = sum(1 for v in votes if v['decision'] == 'agree')
disagree = sum(1 for v in votes if v['decision'] == 'disagree')
abstain = sum(1 for v in votes if v['decision'] == 'abstain')
total = len(votes)
# 判断是否达成共识
if agree / total >= self.threshold:
return Decision(
status='accepted',
votes=votes,
agreement_rate=agree / total
)
elif disagree / total >= self.threshold:
return Decision(
status='rejected',
votes=votes,
disagreement_rate=disagree / total
)
else:
return Decision(
status='undecided',
votes=votes,
agreement_rate=agree / total
)
async def request_vote(
self,
agent_id: str,
proposal: Any
) -> Vote:
"""请求Agent投票"""
# 这里会发送消息给Agent
response = await message_bus.send_and_wait(
receiver=agent_id,
action='vote',
params={'proposal': proposal}
)
return Vote(
agent_id=agent_id,
decision=response['decision'],
reason=response.get('reason')
)
第四部分:通信机制深度解析
4.1 消息队列架构
消息队列架构:
┌─────────────────────────────────────────┐
│ Message Broker │
│ ┌─────────────────────────────────┐ │
│ │ │ │
│ │ ┌───────────────────────┐ │ │
│ │ │ Exchange │ │ │
│ │ │ (路由消息) │ │ │
│ │ └───────────┬───────────┘ │ │
│ │ │ │ │
│ │ ┌───────────┼───────────┐ │ │
│ │ │ │ │ │ │
│ │ ┌──┴──┐ ┌───┴───┐ ┌───┴──┐ │ │
│ │ │Queue│ │Queue │ │Queue │ │ │
│ │ │ A │ │ B │ │ C │ │ │
│ │ └──┬──┘ └───┬───┘ └───┬──┘ │ │
│ │ │ │ │ │ │
└──┼────┼───────────┼───────────┼────┼────┘
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
Agent1 Agent2 Agent3 Agent4
# 消息队列实现
import queue
import threading
from typing import Dict, List, Optional
from dataclasses import dataclass
import time
@dataclass
class QueueMessage:
id: str
exchange: str
routing_key: str
body: bytes
headers: Dict
timestamp: float
delivery_tag: int
class Exchange:
"""交换器:路由消息到队列"""
def __init__(self, name: str, exchange_type: str = 'direct'):
self.name = name
self.type = exchange_type
self.bindings: Dict[str, List[str]] = {} # routing_key -> queue_names
self.queues: Dict[str, 'Queue'] = {}
def bind(self, queue: 'Queue', routing_key: str):
"""绑定队列"""
if routing_key not in self.bindings:
self.bindings[routing_key] = []
self.bindings[routing_key].append(queue.name)
self.queues[queue.name] = queue
def route(self, routing_key: str, message: QueueMessage):
"""路由消息"""
if self.type == 'direct':
# 精确匹配
queue_names = self.bindings.get(routing_key, [])
for name in queue_names:
self.queues[name].enqueue(message)
elif self.type == 'topic':
# 模式匹配
for pattern, queue_names in self.bindings.items():
if self._match_pattern(routing_key, pattern):
for name in queue_names:
self.queues[name].enqueue(message)
elif self.type == 'fanout':
# 广播到所有队列
for queue in self.queues.values():
queue.enqueue(message)
def _match_pattern(self, routing_key: str, pattern: str) -> bool:
"""匹配路由键模式"""
import fnmatch
return fnmatch.fnmatch(routing_key, pattern)
class Queue:
"""队列:存储消息"""
def __init__(self, name: str, max_size: int = 10000):
self.name = name
self.max_size = max_size
self._queue: queue.Queue = queue.Queue(maxsize=max_size)
self._consumers: List[threading.Thread] = []
self._running = False
def enqueue(self, message: QueueMessage):
"""入队"""
try:
self._queue.put_nowait(message)
except queue.Full:
# 队列满,丢弃最旧的消息
try:
self._queue.get_nowait()
self._queue.put_nowait(message)
except:
pass
def dequeue(self, timeout: float = None) -> Optional[QueueMessage]:
"""出队"""
try:
return self._queue.get(timeout=timeout)
except queue.Empty:
return None
def start_consuming(self, callback: callable):
"""开始消费"""
self._running = True
def consumer():
while self._running:
message = self.dequeue(timeout=1)
if message:
try:
callback(message)
except Exception as e:
print(f"消费消息错误: {e}")
thread = threading.Thread(target=consumer, daemon=True)
thread.start()
self._consumers.append(thread)
def stop_consuming(self):
"""停止消费"""
self._running = False
for thread in self._consumers:
thread.join(timeout=1)
class MessageBroker:
"""消息代理"""
def __init__(self):
self.exchanges: Dict[str, Exchange] = {}
self.queues: Dict[str, Queue] = {}
self._lock = threading.Lock()
def create_exchange(
self,
name: str,
exchange_type: str = 'direct'
) -> Exchange:
"""创建交换器"""
with self._lock:
exchange = Exchange(name, exchange_type)
self.exchanges[name] = exchange
return exchange
def create_queue(self, name: str, max_size: int = 10000) -> Queue:
"""创建队列"""
with self._lock:
queue_obj = Queue(name, max_size)
self.queues[name] = queue_obj
return queue_obj
def bind(self, exchange_name: str, queue_name: str, routing_key: str):
"""绑定队列到交换器"""
exchange = self.exchanges.get(exchange_name)
queue = self.queues.get(queue_name)
if exchange and queue:
exchange.bind(queue, routing_key)
def publish(
self,
exchange: str,
routing_key: str,
body: bytes,
headers: Dict = None
):
"""发布消息"""
exchange_obj = self.exchanges.get(exchange)
if not exchange_obj:
raise ValueError(f"Exchange {exchange} not found")
message = QueueMessage(
id=uuid(),
exchange=exchange,
routing_key=routing_key,
body=body,
headers=headers or {},
timestamp=time.time(),
delivery_tag=0
)
exchange_obj.route(routing_key, message)
4.2 共享状态管理
# 分布式状态管理
class DistributedState:
"""分布式状态管理器"""
def __init__(self, broker: MessageBroker):
self.broker = broker
self.local_cache: Dict[str, Any] = {}
self.pending_updates: Dict[str, asyncio.Event] = {}
# 创建内部队列
self.state_queue = self.broker.create_queue(
'state_updates',
max_size=100000
)
# 订阅状态更新
self.broker.bind('state', 'state_updates', 'state.*')
self.state_queue.start_consuming(self._handle_state_update)
async def set(self, key: str, value: Any, ttl: int = None):
"""设置状态"""
update = {
'type': 'set',
'key': key,
'value': value,
'timestamp': time.time(),
'ttl': ttl
}
# 发布更新
self.broker.publish(
exchange='state',
routing_key=f'state.{key}',
body=json.dumps(update).encode()
)
# 等待确认
await self._wait_for_confirm(key)
# 更新本地缓存
self.local_cache[key] = value
async def get(self, key: str, default=None) -> Any:
"""获取状态"""
# 先查本地缓存
if key in self.local_cache:
return self.local_cache[key]
# 发送获取请求
request_id = uuid()
request = {
'type': 'get',
'key': key,
'request_id': request_id
}
response_queue = self.broker.create_queue(
f'response_{request_id}'
)
self.broker.bind('state', response_queue.name, f'response.{request_id}')
self.broker.publish(
exchange='state',
routing_key='state.get',
body=json.dumps(request).encode()
)
# 等待响应
response = response_queue.dequeue(timeout=5)
if response:
data = json.loads(response.body)
if 'value' in data:
self.local_cache[key] = data['value']
return data['value']
return default
async def update(
self,
key: str,
updater: callable,
max_retries: int = 3
):
"""原子更新"""
for attempt in range(max_retries):
# 获取当前值
current = await self.get(key)
# 计算新值
new_value = updater(current)
# 使用版本号进行CAS
update = {
'type': 'cas',
'key': key,
'value': new_value,
'expected_version': self._get_version(key),
'timestamp': time.time()
}
# 发布更新
self.broker.publish(
exchange='state',
routing_key=f'state.{key}',
body=json.dumps(update).encode()
)
# 等待确认
confirmed = await self._wait_for_confirm(key, timeout=2)
if confirmed:
self.local_cache[key] = new_value
return new_value
# 重试
await asyncio.sleep(0.1 * (attempt + 1))
raise ConcurrencyError(f"Failed to update {key} after {max_retries} attempts")
def _handle_state_update(self, message: QueueMessage):
"""处理状态更新"""
data = json.loads(message.body)
if data['type'] == 'set':
key = data['key']
self.local_cache[key] = data['value']
# 触发等待的事件
if key in self.pending_updates:
self.pending_updates[key].set()
async def _wait_for_confirm(
self,
key: str,
timeout: float = 5
) -> bool:
"""等待更新确认"""
event = asyncio.Event()
self.pending_updates[key] = event
try:
await asyncio.wait_for(
event.wait(),
timeout=timeout
)
return True
except asyncio.TimeoutError:
return False
finally:
del self.pending_updates[key]
def _get_version(self, key: str) -> int:
"""获取版本号"""
version_key = f'{key}:_version'
return self.local_cache.get(version_key, 0)
第五部分:实战 - 三Agent协同写文章
5.1 系统架构
三Agent协同写文章系统架构:
┌──────────────────────────────────────────────────────────┐
│ 任务管理器 (Orchestrator) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ 1. 接收任务请求 │ │
│ │ 2. 分解任务为子任务 │ │
│ │ 3. 分配给相应Agent │ │
│ │ 4. 收集结果并汇总 │ │
│ └──────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ 研究员 │ │ 写作员 │ │ 审核员 │
│ (Researcher) │ │ (Writer) │ │ (Reviewer) │
├───────────────┤ ├───────────────┤ ├───────────────┤
│ • 搜索资料 │ │ • 撰写文章 │ │ • 检查逻辑 │
│ • 整理信息 │──▶│ • 优化表达 │──▶│ • 验证事实 │
│ • 生成摘要 │ │ • 添加代码 │ │ • 改进结构 │
└───────────────┘ └───────────────┘ └───────────────┘
│ │ │
└─────────────────────┴─────────────────────┘
│
▼
┌───────────────────┐
│ 最终输出 │
│ (完整文章) │
└───────────────────┘
5.2 完整实现
# 三Agent协同写文章系统
import asyncio
import uuid
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = 'pending'
IN_PROGRESS = 'in_progress'
COMPLETED = 'completed'
FAILED = 'failed'
@dataclass
class ArticleTask:
id: str
topic: str
target_audience: str
style: str # 'technical', 'beginner', 'business'
required_length: int # 字数
status: TaskStatus
research_data: Optional[Dict] = None
draft: Optional[str] = None
review_feedback: Optional[Dict] = None
final_article: Optional[str] = None
@dataclass
class ResearchData:
sources: List[Dict] # [{title, url, content}]
key_points: List[str]
statistics: List[Dict] # [{label, value, source}]
examples: List[Dict] # [{scenario, description}]
@dataclass
class ReviewFeedback:
logic_issues: List[str]
factual_errors: List[str]
style_suggestions: List[str]
structural_suggestions: List[str]
overall_quality: float # 0-10
class MessageBus:
"""简单的内存消息总线"""
def __init__(self):
self.queues: Dict[str, asyncio.Queue] = {}
def create_queue(self, name: str):
if name not in self.queues:
self.queues[name] = asyncio.Queue()
return self.queues[name]
async def send(self, queue_name: str, message: Dict):
if queue_name not in self.queues:
self.create_queue(queue_name)
await self.queues[queue_name].put(message)
async def receive(self, queue_name: str, timeout: float = None) -> Optional[Dict]:
if queue_name not in self.queues:
return None
try:
return await asyncio.wait_for(
self.queues[queue_name].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
class ResearcherAgent:
"""研究员Agent"""
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.agent_id = 'researcher'
async def research(self, topic: str) -> ResearchData:
"""研究主题,收集资料"""
print(f"🔍 研究员开始研究: {topic}")
# 模拟搜索过程
await asyncio.sleep(2)
# 生成研究数据
research_data = ResearchData(
sources=[
{
'title': 'Multi-Agent Systems: A Comprehensive Survey',
'url': 'https://example.com/mas-survey',
'content': 'This paper provides a comprehensive overview...'
},
{
'title': 'Building Effective Agent Teams',
'url': 'https://example.com/agent-teams',
'content': 'Best practices for organizing AI agents...'
}
],
key_points=[
'多Agent系统通过分工协作提升整体效率',
'通信协议是协同的关键基础设施',
'状态共享和共识机制确保一致性',
'任务分解粒度影响系统性能'
],
statistics=[
{'label': '任务处理速度提升', 'value': '300%', 'source': 'Internal Study'},
{'label': '错误率降低', 'value': '75%', 'source': 'Production Data'}
],
examples=[
{
'scenario': '文章自动生成',
'description': '研究员+写作员+审核员协同生产内容'
},
{
'scenario': '代码自动审查',
'description': '分析员+开发者+测试员协同保障质量'
}
]
)
print(f"✅ 研究完成: 收集到 {len(research_data.sources)} 个来源")
return research_data
async def handle_message(self, message: Dict):
"""处理来自其他Agent的消息"""
action = message.get('action')
if action == 'research':
topic = message.get('topic')
result = await self.research(topic)
await self.message_bus.send('research_results', {
'status': 'completed',
'data': {
'sources': [s.__dict__ for s in result.sources],
'key_points': result.key_points,
'statistics': result.statistics,
'examples': result.examples
}
})
class WriterAgent:
"""写作员Agent"""
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.agent_id = 'writer'
async def write(
self,
topic: str,
research_data: ResearchData,
requirements: Dict
) -> str:
"""根据研究资料撰写文章"""
print(f"✍️ 写作员开始撰写: {topic}")
# 模拟写作过程
await asyncio.sleep(3)
# 构建文章
article = self._build_article(topic, research_data, requirements)
print(f"✅ 初稿完成: {len(article)} 字")
return article
def _build_article(
self,
topic: str,
research_data: ResearchData,
requirements: Dict
) -> str:
"""构建文章"""
parts = []
# 标题
parts.append(f"# {topic}\n")
# 摘要
parts.append("## 摘要\n")
parts.append("本文深入探讨了多Agent协同系统的设计与实现...")
parts.append("\n\n")
# 关键点
parts.append("## 核心要点\n")
for i, point in enumerate(research_data.key_points, 1):
parts.append(f"{i}. {point}\n")
parts.append("\n")
# 统计数据
if research_data.statistics:
parts.append("## 数据支撑\n")
for stat in research_data.statistics:
parts.append(f"- **{stat['label']}**: {stat['value']}(来源:{stat['source']})\n")
parts.append("\n")
# 案例
if research_data.examples:
parts.append("## 实践案例\n")
for example in research_data.examples:
parts.append(f"### {example['scenario']}\n")
parts.append(f"{example['description']}\n\n")
# 代码示例
parts.append("## 代码示例\n")
parts.append("```python\n")
parts.append("# 多Agent协同示例\n")
parts.append("agents = [Researcher(), Writer(), Reviewer()]\n")
parts.append("result = pipeline.execute(task)\n")
parts.append("```\n")
return ''.join(parts)
async def revise(
self,
draft: str,
feedback: ReviewFeedback
) -> str:
"""根据反馈修改文章"""
print(f"✏️ 写作员根据反馈修改文章")
# 模拟修改过程
await asyncio.sleep(1)
revised = draft + "\n\n---\n## 修订记录\n"
if feedback.logic_issues:
revised += "**逻辑问题已修正**\n"
if feedback.factual_errors:
revised += "**事实错误已更正**\n"
if feedback.style_suggestions:
revised += "**表达已优化**\n"
return revised
class ReviewerAgent:
"""审核员Agent"""
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.agent_id = 'reviewer'
async def review(self, article: str) -> ReviewFeedback:
"""审核文章"""
print(f"👀 审核员开始审核文章")
# 模拟审核过程
await asyncio.sleep(2)
# 生成审核反馈
feedback = ReviewFeedback(
logic_issues=[
"第三段的因果关系需要更明确",
"示例代码缺少错误处理"
],
factual_errors=[
"统计数据需要添加时间范围"
],
style_suggestions=[
"技术术语首次出现时应提供解释",
"长段落可以适当拆分"
],
structural_suggestions=[
"建议在开头添加'为什么需要'部分",
"结尾的总结可以更具体"
],
overall_quality=7.5
)
print(f"✅ 审核完成: 质量评分 {feedback.overall_quality}/10")
return feedback
async def final_check(self, article: str) -> bool:
"""最终检查"""
# 检查是否满足发布标准
if len(article) < 1000:
return False
# 检查是否包含必要元素
has_title = '#' in article
has_content = len(article.split('\n')) > 10
return has_title and has_content
class Orchestrator:
"""任务编排器"""
def __init__(self):
self.message_bus = MessageBus()
self.researcher = ResearcherAgent(self.message_bus)
self.writer = WriterAgent(self.message_bus)
self.reviewer = ReviewerAgent(self.message_bus)
async def execute(self, task: ArticleTask) -> str:
"""执行协同写作任务"""
print(f"\n🚀 开始协同写作: {task.topic}\n")
# 阶段1: 研究
print("=" * 50)
print("阶段1: 研究")
print("=" * 50)
research_data = await self.researcher.research(task.topic)
# 阶段2: 写作
print("\n" + "=" * 50)
print("阶段2: 写作")
print("=" * 50)
draft = await self.writer.write(
task.topic,
research_data,
{
'audience': task.target_audience,
'style': task.style,
'length': task.required_length
}
)
# 阶段3: 审核
print("\n" + "=" * 50)
print("阶段3: 审核")
print("=" * 50)
feedback = await self.reviewer.review(draft)
# 阶段4: 修改
print("\n" + "=" * 50)
print("阶段4: 修改")
print("=" * 50)
final_article = await self.writer.revise(draft, feedback)
# 阶段5: 最终检查
print("\n" + "=" * 50)
print("阶段5: 最终检查")
print("=" * 50)
is_ready = await self.reviewer.final_check(final_article)
if is_ready:
print("\n✅ 文章已完成并通过审核!")
else:
print("\n⚠️ 文章需要进一步修改")
return final_article
# 运行示例
async def main():
# 创建任务
task = ArticleTask(
id=str(uuid.uuid4()),
topic='多Agent协同系统实战',
target_audience='AI开发者',
style='technical',
required_length=5000,
status=TaskStatus.PENDING
)
# 创建编排器并执行
orchestrator = Orchestrator()
result = await orchestrator.execute(task)
print(f"\n最终文章长度: {len(result)} 字")
if __name__ == '__main__':
asyncio.run(main())
5.3 运行结果
🚀 开始协同写作: 多Agent协同系统实战
==================================================
阶段1: 研究
==================================================
🔍 研究员开始研究: 多Agent协同系统实战
✅ 研究完成: 收集到 2 个来源
==================================================
阶段2: 写作
==================================================
✍️ 写作员开始撰写: 多Agent协同系统实战
✅ 初稿完成: 2847 字
==================================================
阶段3: 审核
==================================================
👀 审核员开始审核文章
✅ 审核完成: 质量评分 7.5/10
==================================================
阶段4: 修改
==================================================
✏️ 写作员根据反馈修改文章
==================================================
阶段5: 最终检查
==================================================
✅ 文章已完成并通过审核!
最终文章长度: 3156 字
5.4 代码结构解析
协同写作系统结构:
│
├── Orchestrator(编排器)
│ ├── 负责任务调度
│ ├── 管理执行流程
│ └── 汇总最终结果
│
├── ResearcherAgent(研究员)
│ ├── 搜索和收集资料
│ ├── 整理关键信息
│ └── 生成结构化数据
│
├── WriterAgent(写作员)
│ ├── 理解写作要求
│ ├── 生成初稿
│ └── 根据反馈修改
│
├── ReviewerAgent(审核员)
│ ├── 检查逻辑和事实
│ ├── 评估文章质量
│ └── 提出改进建议
│
└── MessageBus(消息总线)
├── 连接各个Agent
├── 传递消息和结果
└── 解耦组件通信
第六部分:高级话题
6.1 Agent自我进化
# Agent自我学习和进化
class SelfImprovingAgent:
"""能够从经验中学习的Agent"""
def __init__(self, base_capabilities: Dict):
self.capabilities = base_capabilities
self.performance_history: List[Dict] = []
self.improvement_threshold = 0.8
async def execute_with_learning(
self,
task: Task,
context: Dict
) -> Result:
# 执行任务
result = await self.execute(task, context)
# 评估表现
performance = self.evaluate_performance(task, result)
self.performance_history.append({
'task': task.type,
'performance': performance,
'timestamp': time.time()
})
# 检查是否需要学习
if performance < self.improvement_threshold:
await self.learn_from_mistake(task, result)
return result
async def learn_from_mistake(self, task: Task, result: Result):
"""从错误中学习"""
# 分析错误原因
analysis = await self.analyze_mistake(task, result)
# 获取正确做法
correct_approach = await self.get_correct_approach(
task.type,
analysis
)
# 更新能力
self.capabilities[task.type] = correct_approach
# 添加到记忆
self.add_to_memory(task.type, analysis, correct_approach)
6.2 动态资源分配
# 动态资源分配器
class DynamicResourceAllocator:
"""根据任务需求动态分配Agent资源"""
def __init__(self, available_agents: Dict[str, Agent]):
self.agents = available_agents
self.load_balancer = LoadBalancer()
self.capability_matcher = CapabilityMatcher()
async def allocate(self, task: Task) -> List[Agent]:
"""为任务分配合适的Agent"""
# 评估任务需求
required_capabilities = self.assess_requirements(task)
# 选择Agent
selected = []
for cap in required_capabilities:
agent = await self.select_best_agent(cap)
if agent:
selected.append(agent)
# 负载均衡
return self.load_balancer.balance(selected)
async def select_best_agent(self, capability: str) -> Optional[Agent]:
"""选择最佳Agent"""
candidates = [
(name, agent)
for name, agent in self.agents.items()
if capability in agent.capabilities
]
if not candidates:
return None
# 选择负载最低的
return min(
candidates,
key=lambda x: self.agents[x[0]].current_load
)[1]
总结
核心要点回顾
为什么需要多Agent协同
单体Agent能力有限 分工协作提升效率和质量 专业化带来更好的输出
MCP协议
标准化的消息格式 支持请求/响应/通知 内置错误处理和重试
协同模式
分工协同:流水线处理 协作协同:动态配合 竞争协同:多解择优
通信机制
消息队列解耦组件 共享状态保证一致性 发布订阅实现松耦合
实战技巧
合理的任务分解 Agent能力匹配 容错和重试机制
明日预告
Day 23:Agent记忆与学习
短期记忆 vs 长期记忆 经验积累与知识复用 自我改进机制
本文属于「OpenClaw 深度系列」,每天早7:30自动发布 关注公众号「人生海海AIGC」,不错过每一篇深度好文
夜雨聆风