QwenPaw源码解析系列(五):ACP协议与多智能体通信
前言
多智能体协作是QwenPaw的核心能力之一,而ACP(Agent Communication Protocol)则是实现这一能力的关键协议。今天让我们深入解析ACP协议的设计与实现。
ACP协议概述
ACP是QwenPaw设计的智能体间通信协议,支持:
┌─────────┐ ┌─────────┐
│ Agent A │ ←──── ACP ────→ │ Agent B │
└─────────┘ └─────────┘
↑ ↑
└─────────── 共享上下文 ───────────┘
ACP核心组件
ACPService:通信服务
class ACPService:
"""ACP通信服务,负责管理智能体间的通信"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.connections: dict[str, ACPConnection] = {}
self.handlers: dict[str, Handler] = {}
async def start(self) -> None:
"""启动ACP服务"""
await self._init_server()
await self._load_message_handlers()
async def connect(self, target: str) -> ACPConnection:
"""连接到目标智能体"""
if target not in self.connections:
conn = await self._establish_connection(target)
self.connections[target] = conn
return self.connections[target]
async def send_message(
self,
target: str,
message: ACPMessage,
) -> None:
"""发送消息到目标智能体"""
conn = await self.connect(target)
await conn.send(message)
async def broadcast(
self,
message: ACPMessage,
targets: list[str] | None = None,
) -> None:
"""广播消息"""
if targets is None:
targets = list(self.connections.keys())
for target in targets:
await self.send_message(target, message)
ACPMessage:消息格式
@dataclass
class ACPMessage:
"""ACP协议消息格式"""
id: str # 消息唯一标识
type: ACPMessageType # 消息类型
source: str # 发送方智能体ID
target: str # 接收方智能体ID
content: dict # 消息内容
metadata: ACPMetadata # 元数据
timestamp: datetime # 时间戳
signature: str # 签名(用于认证)
class ACPMessageType(Enum):
"""消息类型枚举"""
REQUEST = "request" # 请求
RESPONSE = "response" # 响应
NOTIFICATION = "notification" # 通知
HEARTBEAT = "heartbeat" # 心跳
ERROR = "error" # 错误
DELEGATE = "delegate" # 委托任务
RESULT = "result" # 结果返回
消息类型详解
# 1. 任务委托消息
@dataclass
class DelegateMessage(ACPMessage):
"""任务委托消息"""
def __post_init__(self):
self.type = ACPMessageType.DELEGATE
content: dict = field(default_factory=dict)
@property
def task_id(self) -> str:
return self.content.get("task_id", "")
@property
def task_description(self) -> str:
return self.content.get("description", "")
@property
def parameters(self) -> dict:
return self.content.get("parameters", {})
@property
def callback_url(self) -> str | None:
return self.content.get("callback_url")
QwenPawACPAgent:ACP服务器
class QwenPawACPAgent:
"""ACP协议智能体服务器"""
def __init__(
self,
agent: QwenPawAgent,
port: int = 8080,
host: str = "0.0.0.0",
):
self.agent = agent
self.server = ACPWebSocketServer(host, port)
self.permissions = PermissionManager()
async def run(self) -> None:
"""启动ACP服务器"""
await self.server.start()
self.server.on_message(self._handle_message)
self.server.on_connect(self._handle_connect)
self.server.on_disconnect(self._handle_disconnect)
# 注册处理器
await self._register_handlers()
# 启动心跳
asyncio.create_task(self._heartbeat_loop())
async def _handle_message(
self,
conn: ACPConnection,
message: ACPMessage,
) -> None:
"""处理接收到的消息"""
# 1. 验证权限
if not await self.permissions.check(conn, message):
await self._send_error(conn, "Permission denied")
return
# 2. 分发到处理器
handler = self.handlers.get(message.type)
if handler:
response = await handler(self.agent, message)
await conn.send(response)
else:
await self._send_error(conn, f"Unknown message type: {message.type}")
async def _handle_delegate(
self,
agent: QwenPawAgent,
message: ACPMessage,
) -> ACPMessage:
"""处理任务委托"""
delegate_msg = DelegateMessage(**message.__dict__)
# 创建任务
task = Task(
id=delegate_msg.task_id,
description=delegate_msg.task_description,
parameters=delegate_msg.parameters,
callback_url=delegate_msg.callback_url,
)
# 执行任务
result = await agent.reply(
Msg(
role="user",
content=f"执行任务: {delegate_msg.task_description}\n"
f"参数: {delegate_msg.parameters}",
)
)
# 构建结果消息
return ACPMessage(
id=generate_message_id(),
type=ACPMessageType.RESULT,
source=self.agent.agent_id,
target=delegate_msg.source,
content={
"task_id": delegate_msg.task_id,
"status": "completed",
"result": result.get_text_content(),
},
metadata=ACPMetadata(),
timestamp=datetime.now(),
)
权限管理系统
class PermissionManager:
"""ACP权限管理器"""
def __init__(self):
self.policies: dict[str, Policy] = {}
async def check(
self,
conn: ACPConnection,
message: ACPMessage,
) -> bool:
"""检查操作权限"""
# 1. 检查连接是否已认证
if not conn.authenticated:
return await self._authenticate(conn)
# 2. 检查消息类型权限
policy = self._get_policy(conn.agent_id)
return policy.can_send(message.type)
async def _authenticate(self, conn: ACPConnection) -> bool:
"""认证连接"""
challenge = self._generate_challenge()
await conn.send(ACPMessage(
type=ACPMessageType.REQUEST,
content={"action": "authenticate", "challenge": challenge},
))
response = await conn.recv()
# 验证签名
return self._verify_signature(response, conn.agent_id)
任务委托流程
委托方视角
class Agent:
async def delegate_task(
self,
target_agent: str,
task_description: str,
parameters: dict,
) -> str:
"""委托任务给其他智能体"""
# 1. 生成任务ID
task_id = generate_task_id()
# 2. 发送委托消息
message = DelegateMessage(
id=generate_message_id(),
type=ACPMessageType.DELEGATE,
source=self.agent_id,
target=target_agent,
content={
"task_id": task_id,
"description": task_description,
"parameters": parameters,
"callback_url": self.get_callback_url(),
},
metadata=ACPMetadata(priority="normal"),
timestamp=datetime.now(),
)
await self.acp_service.send_message(target_agent, message)
# 3. 记录任务状态
self.pending_tasks[task_id] = {
"description": task_description,
"target": target_agent,
"status": "pending",
"start_time": datetime.now(),
}
return task_id
async def check_task_status(self, task_id: str) -> TaskStatus:
"""检查任务状态"""
if task_id in self.pending_tasks:
return self.pending_tasks[task_id]
return self.completed_tasks.get(task_id)
执行方视角
class QwenPawACPAgent:
async def _handle_delegate(
self,
agent: QwenPawAgent,
message: ACPMessage,
) -> ACPMessage:
"""处理委托任务"""
delegate_msg = DelegateMessage(**message.__dict__)
# 1. 记录任务
self.active_tasks[delegate_msg.task_id] = {
"description": delegate_msg.task_description,
"source": delegate_msg.source,
"status": "running",
}
# 2. 在后台执行任务(不阻塞)
asyncio.create_task(self._execute_task(delegate_msg))
# 3. 返回确认
return ACPMessage(
type=ACPMessageType.RESPONSE,
content={
"task_id": delegate_msg.task_id,
"status": "accepted",
},
)
async def _execute_task(self, delegate_msg: DelegateMessage) -> None:
"""执行委托任务"""
try:
# 执行任务
result = await self.agent.reply(
Msg(role="user", content=delegate_msg.task_description)
)
# 发送结果
result_msg = ACPMessage(
type=ACPMessageType.RESULT,
target=delegate_msg.source,
content={
"task_id": delegate_msg.task_id,
"status": "completed",
"result": result.get_text_content(),
},
)
await self.acp_service.send_message(
delegate_msg.source,
result_msg,
)
except Exception as e:
# 发送错误
error_msg = ACPMessage(
type=ACPMessageType.ERROR,
target=delegate_msg.source,
content={
"task_id": delegate_msg.task_id,
"error": str(e),
},
)
await self.acp_service.send_message(
delegate_msg.source,
error_msg,
)
跨实例通信
QwenPaw支持跨实例的智能体通信:
class ACPClient:
"""ACP客户端,用于连接到远程智能体"""
def __init__(
self,
server_url: str,
agent_id: str,
credentials: ACPcredentials,
):
self.server_url = server_url
self.agent_id = agent_id
self.credentials = credentials
self.ws: WebSocket = None
async def connect(self) -> None:
"""连接到ACP服务器"""
self.ws = await websockets.connect(
f"{self.server_url}/acp",
extra_headers={
"Authorization": f"Bearer {self.credentials.token}",
},
)
async def send_direct_message(
self,
target: str,
content: dict,
) -> None:
"""发送直接消息"""
message = ACPMessage(
id=generate_message_id(),
type=ACPMessageType.REQUEST,
source=self.agent_id,
target=target,
content=content,
metadata=ACPMetadata(),
timestamp=datetime.now(),
signature=self._sign_message(content),
)
await self.ws.send(message.to_json())
async def receive(self) -> ACPMessage:
"""接收消息"""
data = await self.ws.recv()
return ACPMessage.from_json(data)
心跳与健康检查
class ACPService:
async def _heartbeat_loop(self) -> None:
"""心跳循环"""
while self.running:
# 发送心跳
for conn_id, conn in self.connections.items():
try:
await conn.send(ACPMessage(
type=ACPMessageType.HEARTBEAT,
content={"timestamp": datetime.now().isoformat()},
))
except Exception:
# 连接断开,移除
self._remove_connection(conn_id)
await asyncio.sleep(self.heartbeat_interval)
async def _health_check(self) -> dict:
"""健康检查"""
return {
"agent_id": self.agent_id,
"connections": len(self.connections),
"pending_tasks": len(self.active_tasks),
"timestamp": datetime.now().isoformat(),
}
错误处理与重连
class ACPConnection:
"""ACP连接管理"""
async def send_with_retry(
self,
message: ACPMessage,
max_retries: int = 3,
) -> None:
"""带重试的消息发送"""
for attempt in range(max_retries):
try:
await self.ws.send(message.to_json())
return
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
await self.reconnect()
async def reconnect(self) -> None:
"""重连"""
if not self._should_reconnect():
return
self._close()
self.ws = await websockets.connect(
self.url,
extra_headers=self.headers,
)
使用示例
# 委托任务给其他智能体
async def example():
agent = QwenPawAgent(config)
# 委托数据分析任务
task_id = await agent.delegate_task(
target_agent="data_analyst@server2",
task_description="分析这份销售数据,生成报告",
parameters={
"file_path": "/data/sales.csv",
"report_format": "markdown",
},
)
# 异步等待结果
while True:
status = await agent.check_task_status(task_id)
if status["status"] == "completed":
print(f"任务完成,结果: {status['result']}")
break
elif status["status"] == "failed":
print(f"任务失败: {status['error']}")
break
await asyncio.sleep(1)
总结
ACP协议的核心设计原则:
通过ACP协议,QwenPaw实现了多智能体之间的无缝协作,无论是同实例还是跨实例通信。
往期回顾:
下期预告:
如果对你有帮助,欢迎点赞、在看!
夜雨聆风