乐于分享
好东西不私藏

QwenPaw源码解析系列(二):深入QwenPawAgent的核心实现

QwenPaw源码解析系列(二):深入QwenPawAgent的核心实现

前言

在上一篇文章中,我们了解了QwenPaw的整体架构设计。今天让我们深入到核心实现,剖析QwenPawAgent是如何实现智能体功能的。

QwenPawAgent的初始化流程

构造函数详解

def __init__(
self,
agent_config: "AgentProfileConfig",
env_context: Optional[str] = None,
mcp_clients: Optional[List[Any]] = None,
memory_manager: BaseMemoryManager | None = None,
context_manager: BaseContextManager | None = None,
request_context: Optional[dict[str, str]] = None,
namesake_strategy: NamesakeStrategy = "skip",
workspace_dir: Path | None = None,
task_tracker: Any | None = None,
plan_notebook: Any | None = None,
):

核心参数说明:

参数 类型 说明
agent_config AgentProfileConfig 智能体配置文件,包含运行配置、语言设置等
memory_manager BaseMemoryManager 记忆管理器,管理长期记忆
context_manager BaseContextManager 上下文管理器,管理会话上下文
mcp_clients List[Any] MCP协议客户端列表
namesake_strategy NamesakeStrategy 同名工具冲突处理策略

初始化步骤

# 1. 创建工具包
toolkit = self._create_toolkit(namesake_strategy=namesake_strategy)

# 2. 注册技能
self._register_skills(toolkit)

# 3. 构建系统提示词
sys_prompt = self._build_sys_prompt()

# 4. 创建模型和格式化器
model, formatter = create_model_and_formatter(agent_id=agent_config.id)

# 5. 初始化父类ReActAgent
super().__init__(
name="Friday",
model=model,
sys_prompt=sys_prompt,
toolkit=toolkit,
memory=InMemoryMemory(),
formatter=formatter,
max_iters=running_config.max_iters,
)

ReAct循环的执行流程

QwenPawAgent基于ReAct(Reasoning + Acting)模式,其核心流程如下:

用户输入 → 推理(reasoning) → 行动(acting) → ... → 回复(reply)
↑ ↓
└────────────────────────┘
(循环直到完成)

推理阶段:_reasoning

async def _reasoning(
self,
tool_choice: Literal["auto", "none", "required"] | None = None,
) -> Msg:
"""推理阶段的核心逻辑"""

# 1. 主动过滤多模态内容
should_strip = (
not get_active_model_supports_multimodal()
or self._model_rejects_media()
)
if should_strip:
self._set_formatter_media_strip(True)

# 2. 调用父类推理(触发ToolGuardMixin拦截)
try:
msg = await super()._reasoning(tool_choice=tool_choice)
except Exception as e:
# 3. 如果失败,尝试过滤媒体后重试
if self._is_bad_request_or_media_error(e):
self._set_formatter_media_strip(True)
msg = await super()._reasoning(tool_choice=tool_choice)

# 4. 自动继续:如果只返回了文本没有工具调用
return await self._auto_continue_if_text_only(msg, tool_choice)

关键点1:多模态支持处理

QwenPaw实现了三层多模态容错机制:

Layer 1: 主动过滤 - 模型不支持多模态时,推理前过滤
Layer 2: 被动重试 - 模型调用失败时,尝试过滤后重试
Layer 3: 能力学习 - 记录模型的多模态能力标志
def _model_rejects_media(self) -> bool:
"""从能力缓存中获取模型是否拒绝媒体内容"""
key = self._get_model_key()
return get_capability_cache().get(key, "rejects_media", False)

关键点2:自动继续机制

当模型只返回文本而没有调用工具时,QwenPaw会主动注入提示:

_AUTO_CONTINUE_HINT_ZH = (
"<system-hint>"
"上轮助手仅文字、未调工具。请结合上下文与 <previous-assistant-tail> "
"(若有)在本轮推理中判断:仍需执行则立刻 tool;已完结则简短收尾。"
"需要操作时勿只输出计划或代码块。"
"</system-hint>"
)

行动阶段:_acting

async def _acting(self, tool_call) -> dict | None:
"""行动阶段:执行工具调用"""

# 1. 修复JSON字符串参数(模型有时会产生这种输出)
if tool_name in self._PLAN_TOOLS_WITH_JSON_ARGS:
self._fix_stringified_json_args(tool_call)

# 2. 检查计划工具门控
nb = getattr(self, "plan_notebook", None)
if nb is not None:
err = check_plan_tool_gate(nb, tool_name)
if err:
# 返回错误信息而非执行工具
return None

# 3. 调用父类行动(触发ToolGuardMixin)
result = await super()._acting(tool_call)

# 4. 如果是计划修订工具,标记计划已变更
if nb is not None and tool_name == "revise_current_plan":
nb._plan_just_mutated = True

工具系统:Toolkit

工具注册

def _create_toolkit(self, namesake_strategy: NamesakeStrategy) -> Toolkit:
"""创建工具包并注册内置工具"""

toolkit = Toolkit()

tool_functions = {
"execute_shell_command": execute_shell_command,
"read_file": read_file,
"write_file": write_file,
"edit_file": edit_file,
"grep_search": grep_search,
"glob_search": glob_search,
"browser_use": browser_use,
"desktop_screenshot": desktop_screenshot,
"view_image": view_image,
"view_video": view_video,
"send_file_to_user": send_file_to_user,
"get_current_time": get_current_time,
"set_user_timezone": set_user_timezone,
"get_token_usage": get_token_usage,
"delegate_external_agent": delegate_external_agent,
"list_agents": list_agents,
"chat_with_agent": chat_with_agent,
"submit_to_agent": submit_to_agent,
"check_agent_task": check_agent_task,
}

# 根据配置注册工具
for tool_name, tool_func in tool_functions.items():
if not enabled_tools.get(tool_name, True): # 默认启用
continue

async_exec = async_execution_tools.get(tool_name, False)
toolkit.register_tool_function(
tool_func,
namesake_strategy=namesake_strategy,
async_execution=async_exec,
)

异步工具支持

部分工具支持异步执行,这对于长时间运行的操作至关重要:

# 检查是否启用了异步工具
has_async_tools = any(
async_execution_tools.get(name, False)
for name in tool_functions
if enabled_tools.get(name, True)
)

if has_async_tools:
# 注册后台任务管理工具
toolkit.register_tool_function(toolkit.view_task)
toolkit.register_tool_function(toolkit.wait_task)
toolkit.register_tool_function(toolkit.cancel_task)

命令系统

QwenPaw实现了系统命令功能,用户可以通过特殊前缀触发命令:

命令检测

def is_command(self, query: str | None) -> bool:
"""检查是否为系统命令"""
if query is None:
return False
return query.startswith('/')

命令处理流程

# 在reply()方法中
if self.command_handler.is_command(query):
logger.info(f"Received command: {query}")
msg = await self.command_handler.handle_command(query)
await self.print(msg)
return msg

内置命令

命令 功能
/compact 压缩上下文
/new 开始新对话
/clear 清除记忆
/status 查看状态

安全机制:ToolGuardMixin

Mixin原理

class ToolGuardMixin:
"""混入类,拦截工具调用进行安全检查"""

async def _acting(self, tool_call) -> dict | None:
# 在这里拦截工具调用
# 检查是否危险
# 如果危险,返回警告而非执行

通过Python的MRO(方法解析顺序),QwenPawAgent的调用链为:

QwenPawAgent._acting()
→ ToolGuardMixin._acting() # 安全拦截
→ ReActAgent._acting() # 实际执行

危险命令检测

ToolGuardMixin会拦截以下危险模式:

rm -rf / – 递归删除根目录
fork bomb – 叉炸弹
reverse shell – 反向shell
访问敏感路径 – ~/.ssh, ~/.aws

回复处理:reply()

async def reply(
self,
msg: Msg | list[Msg] | None = None,
structured_model: Type[BaseModel] | None = None,
) -> Msg:
"""主回复入口"""

# 1. 设置上下文信息
set_current_workspace_dir(self._workspace_dir)
set_current_recent_max_bytes(pruning_config.pruning_recent_msg_max_bytes)
set_current_shell_command_timeout(self._agent_config.running.shell_command_timeout)

# 2. 处理文件块和媒体块
if msg is not None:
await process_file_and_media_blocks_in_message(msg)

# 3. 检查命令
if self.command_handler.is_command(query):
return await self.command_handler.handle_command(query)

# 4. 应用技能配置环境覆盖
with apply_skill_config_env_overrides(workspace_dir, channel_name):
return await super().reply(msg=msg, structured_model=structured_model)

生命周期钩子注册

def _register_hooks(self) -> None:
"""注册生命周期钩子"""

# 引导钩子 - 检查BOOTSTRAP.md
bootstrap_hook = BootstrapHook(
working_dir=working_dir,
language=self._language,
)
self.register_instance_hook(
hook_type="pre_reasoning",
hook_name="bootstrap_hook",
hook=bootstrap_hook.__call__,
)

# 上下文管理器钩子
if self.context_manager is not None:
self.register_instance_hook(hook_type="pre_reply", ...)
self.register_instance_hook(hook_type="pre_reasoning", ...)
self.register_instance_hook(hook_type="post_acting", ...)
self.register_instance_hook(hook_type="post_reply", ...)

总结

QwenPawAgent的核心设计体现了以下原则:

1.
继承与组合:通过Mixin模式灵活扩展功能
2.
容错机制:多模态支持的三层容错设计
3.
安全第一:ToolGuardMixin拦截危险操作
4.
可扩展性:插件化的工具和记忆系统
5.
生命周期管理:完善的钩子机制

在下一篇文章中,我们将深入解析记忆系统(Memory)和上下文管理系统(Context)的实现原理。


往期回顾:

系列一:项目概览与整体架构

下期预告:

记忆系统:BaseMemoryManager与记忆进化机制
上下文管理:Context压缩与Token控制
主动交互:Proactive模块的实现

如果对你有帮助,欢迎点赞在看,我们下期见!