QwenPaw源码解析系列(二):深入QwenPawAgent的核心实现
前言
在上一篇文章中,我们了解了QwenPaw的整体架构设计。今天让我们深入到核心实现,剖析QwenPawAgent是如何实现智能体功能的。
QwenPawAgent的初始化流程
构造函数详解
def __init__(
self,
agent_config: "AgentProfileConfig",
env_context: Optional[str] = None,
mcp_clients: Optional[List[Any]] = None,
memory_manager: BaseMemoryManager | None = None,
context_manager: BaseContextManager | None = None,
request_context: Optional[dict[str, str]] = None,
namesake_strategy: NamesakeStrategy = "skip",
workspace_dir: Path | None = None,
task_tracker: Any | None = None,
plan_notebook: Any | None = None,
):
核心参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
| agent_config | AgentProfileConfig | 智能体配置文件,包含运行配置、语言设置等 |
| memory_manager | BaseMemoryManager | 记忆管理器,管理长期记忆 |
| context_manager | BaseContextManager | 上下文管理器,管理会话上下文 |
| mcp_clients | List[Any] | MCP协议客户端列表 |
| namesake_strategy | NamesakeStrategy | 同名工具冲突处理策略 |
初始化步骤
# 1. 创建工具包
toolkit = self._create_toolkit(namesake_strategy=namesake_strategy)
# 2. 注册技能
self._register_skills(toolkit)
# 3. 构建系统提示词
sys_prompt = self._build_sys_prompt()
# 4. 创建模型和格式化器
model, formatter = create_model_and_formatter(agent_id=agent_config.id)
# 5. 初始化父类ReActAgent
super().__init__(
name="Friday",
model=model,
sys_prompt=sys_prompt,
toolkit=toolkit,
memory=InMemoryMemory(),
formatter=formatter,
max_iters=running_config.max_iters,
)
ReAct循环的执行流程
QwenPawAgent基于ReAct(Reasoning + Acting)模式,其核心流程如下:
用户输入 → 推理(reasoning) → 行动(acting) → ... → 回复(reply)
↑ ↓
└────────────────────────┘
(循环直到完成)
推理阶段:_reasoning
async def _reasoning(
self,
tool_choice: Literal["auto", "none", "required"] | None = None,
) -> Msg:
"""推理阶段的核心逻辑"""
# 1. 主动过滤多模态内容
should_strip = (
not get_active_model_supports_multimodal()
or self._model_rejects_media()
)
if should_strip:
self._set_formatter_media_strip(True)
# 2. 调用父类推理(触发ToolGuardMixin拦截)
try:
msg = await super()._reasoning(tool_choice=tool_choice)
except Exception as e:
# 3. 如果失败,尝试过滤媒体后重试
if self._is_bad_request_or_media_error(e):
self._set_formatter_media_strip(True)
msg = await super()._reasoning(tool_choice=tool_choice)
# 4. 自动继续:如果只返回了文本没有工具调用
return await self._auto_continue_if_text_only(msg, tool_choice)
关键点1:多模态支持处理
QwenPaw实现了三层多模态容错机制:
Layer 1: 主动过滤 - 模型不支持多模态时,推理前过滤
Layer 2: 被动重试 - 模型调用失败时,尝试过滤后重试
Layer 3: 能力学习 - 记录模型的多模态能力标志
def _model_rejects_media(self) -> bool:
"""从能力缓存中获取模型是否拒绝媒体内容"""
key = self._get_model_key()
return get_capability_cache().get(key, "rejects_media", False)
关键点2:自动继续机制
当模型只返回文本而没有调用工具时,QwenPaw会主动注入提示:
_AUTO_CONTINUE_HINT_ZH = (
"<system-hint>"
"上轮助手仅文字、未调工具。请结合上下文与 <previous-assistant-tail> "
"(若有)在本轮推理中判断:仍需执行则立刻 tool;已完结则简短收尾。"
"需要操作时勿只输出计划或代码块。"
"</system-hint>"
)
行动阶段:_acting
async def _acting(self, tool_call) -> dict | None:
"""行动阶段:执行工具调用"""
# 1. 修复JSON字符串参数(模型有时会产生这种输出)
if tool_name in self._PLAN_TOOLS_WITH_JSON_ARGS:
self._fix_stringified_json_args(tool_call)
# 2. 检查计划工具门控
nb = getattr(self, "plan_notebook", None)
if nb is not None:
err = check_plan_tool_gate(nb, tool_name)
if err:
# 返回错误信息而非执行工具
return None
# 3. 调用父类行动(触发ToolGuardMixin)
result = await super()._acting(tool_call)
# 4. 如果是计划修订工具,标记计划已变更
if nb is not None and tool_name == "revise_current_plan":
nb._plan_just_mutated = True
工具系统:Toolkit
工具注册
def _create_toolkit(self, namesake_strategy: NamesakeStrategy) -> Toolkit:
"""创建工具包并注册内置工具"""
toolkit = Toolkit()
tool_functions = {
"execute_shell_command": execute_shell_command,
"read_file": read_file,
"write_file": write_file,
"edit_file": edit_file,
"grep_search": grep_search,
"glob_search": glob_search,
"browser_use": browser_use,
"desktop_screenshot": desktop_screenshot,
"view_image": view_image,
"view_video": view_video,
"send_file_to_user": send_file_to_user,
"get_current_time": get_current_time,
"set_user_timezone": set_user_timezone,
"get_token_usage": get_token_usage,
"delegate_external_agent": delegate_external_agent,
"list_agents": list_agents,
"chat_with_agent": chat_with_agent,
"submit_to_agent": submit_to_agent,
"check_agent_task": check_agent_task,
}
# 根据配置注册工具
for tool_name, tool_func in tool_functions.items():
if not enabled_tools.get(tool_name, True): # 默认启用
continue
async_exec = async_execution_tools.get(tool_name, False)
toolkit.register_tool_function(
tool_func,
namesake_strategy=namesake_strategy,
async_execution=async_exec,
)
异步工具支持
部分工具支持异步执行,这对于长时间运行的操作至关重要:
# 检查是否启用了异步工具
has_async_tools = any(
async_execution_tools.get(name, False)
for name in tool_functions
if enabled_tools.get(name, True)
)
if has_async_tools:
# 注册后台任务管理工具
toolkit.register_tool_function(toolkit.view_task)
toolkit.register_tool_function(toolkit.wait_task)
toolkit.register_tool_function(toolkit.cancel_task)
命令系统
QwenPaw实现了系统命令功能,用户可以通过特殊前缀触发命令:
命令检测
def is_command(self, query: str | None) -> bool:
"""检查是否为系统命令"""
if query is None:
return False
return query.startswith('/')
命令处理流程
# 在reply()方法中
if self.command_handler.is_command(query):
logger.info(f"Received command: {query}")
msg = await self.command_handler.handle_command(query)
await self.print(msg)
return msg
内置命令
| 命令 | 功能 |
|---|---|
| /compact | 压缩上下文 |
| /new | 开始新对话 |
| /clear | 清除记忆 |
| /status | 查看状态 |
安全机制:ToolGuardMixin
Mixin原理
class ToolGuardMixin:
"""混入类,拦截工具调用进行安全检查"""
async def _acting(self, tool_call) -> dict | None:
# 在这里拦截工具调用
# 检查是否危险
# 如果危险,返回警告而非执行
通过Python的MRO(方法解析顺序),QwenPawAgent的调用链为:
QwenPawAgent._acting()
→ ToolGuardMixin._acting() # 安全拦截
→ ReActAgent._acting() # 实际执行
危险命令检测
ToolGuardMixin会拦截以下危险模式:
rm -rf / – 递归删除根目录fork bomb – 叉炸弹reverse shell – 反向shell~/.ssh, ~/.aws 等回复处理:reply()
async def reply(
self,
msg: Msg | list[Msg] | None = None,
structured_model: Type[BaseModel] | None = None,
) -> Msg:
"""主回复入口"""
# 1. 设置上下文信息
set_current_workspace_dir(self._workspace_dir)
set_current_recent_max_bytes(pruning_config.pruning_recent_msg_max_bytes)
set_current_shell_command_timeout(self._agent_config.running.shell_command_timeout)
# 2. 处理文件块和媒体块
if msg is not None:
await process_file_and_media_blocks_in_message(msg)
# 3. 检查命令
if self.command_handler.is_command(query):
return await self.command_handler.handle_command(query)
# 4. 应用技能配置环境覆盖
with apply_skill_config_env_overrides(workspace_dir, channel_name):
return await super().reply(msg=msg, structured_model=structured_model)
生命周期钩子注册
def _register_hooks(self) -> None:
"""注册生命周期钩子"""
# 引导钩子 - 检查BOOTSTRAP.md
bootstrap_hook = BootstrapHook(
working_dir=working_dir,
language=self._language,
)
self.register_instance_hook(
hook_type="pre_reasoning",
hook_name="bootstrap_hook",
hook=bootstrap_hook.__call__,
)
# 上下文管理器钩子
if self.context_manager is not None:
self.register_instance_hook(hook_type="pre_reply", ...)
self.register_instance_hook(hook_type="pre_reasoning", ...)
self.register_instance_hook(hook_type="post_acting", ...)
self.register_instance_hook(hook_type="post_reply", ...)
总结
QwenPawAgent的核心设计体现了以下原则:
在下一篇文章中,我们将深入解析记忆系统(Memory)和上下文管理系统(Context)的实现原理。
往期回顾:
下期预告:
如果对你有帮助,欢迎点赞、在看,我们下期见!
夜雨聆风