QwenPaw源码解析系列(六):安全防护体系深度解析
前言
作为个人AI助手,安全是QwenPaw最重视的方面之一。本篇文章将深入解析QwenPaw的多层安全防护体系,包括ToolGuardMixin、命令守卫、文件访问控制等核心机制。
安全防护架构概览
QwenPaw采用了纵深防御的安全架构:
┌─────────────────────────────────────────────────────────┐
│ 安全防护层 │
├─────────────────────────────────────────────────────────┤
│ Layer 1: 工具守卫 (ToolGuardMixin) │
│ Layer 2: 命令白名单 (ShellCommandValidator) │
│ Layer 3: 文件访问控制 (FileAccessGuard) │
│ Layer 4: 技能安全扫描 (SkillSecurityScanner) │
│ Layer 5: API认证 (APIAuthenticator) │
└─────────────────────────────────────────────────────────┘
ToolGuardMixin:工具守卫
Mixin实现原理
class ToolGuardMixin:
"""
工具守卫混入类,拦截所有工具调用进行安全检查
通过Python的MRO(方法解析顺序)实现:
QwenPawAgent → ToolGuardMixin → ReActAgent
"""
async def _acting(self, tool_call: dict) -> dict | None:
"""
拦截工具调用,执行安全检查
Args:
tool_call: 工具调用请求
Returns:
None表示拦截(阻止执行),返回dict表示放行
"""
tool_name = tool_call.get("name", "")
tool_args = tool_call.get("arguments", {})
# 1. 执行安全检查
check_result = await self._security_check(tool_name, tool_args)
if not check_result.allowed:
logger.warning(
f"Tool '{tool_name}' blocked by security policy: "
f"{check_result.reason}"
)
return self._blocked_response(tool_call, check_result.reason)
# 2. 通过检查,传递给父类执行
return await super()._acting(tool_call)
安全检查流程
async def _security_check(
self,
tool_name: str,
tool_args: dict,
) -> SecurityCheckResult:
"""执行安全检查"""
# 1. 检查工具是否在允许列表中
if not self._is_tool_allowed(tool_name):
return SecurityCheckResult(
allowed=False,
reason=f"Tool '{tool_name}' is not in the allowed list"
)
# 2. 工具特定的参数检查
if tool_name == "execute_shell_command":
result = await self._check_shell_command(tool_args)
if not result.allowed:
return result
# 3. 文件访问检查
if tool_name in ["read_file", "write_file", "edit_file"]:
result = self._check_file_access(tool_args.get("path", ""))
if not result.allowed:
return result
# 4. 浏览器操作检查
if tool_name == "browser_use":
result = self._check_browser_action(tool_args)
if not result.allowed:
return result
return SecurityCheckResult(allowed=True)
Shell命令守卫
命令验证器
class ShellCommandValidator:
"""Shell命令验证器,检测危险命令模式"""
# 危险命令模式
DANGEROUS_PATTERNS = [
# 递归删除
(r"rm\s+-rf\s+/", "Recursive root delete"),
(r"rm\s+-rf\s+\*\s*$", "Recursive delete in root"),
# Fork炸弹
(r":\(\)\{\s*:\|:\s*&\s*\};:\|:&\s*&", "Fork bomb"),
(r"fork\(\)\s*\{", "Fork pattern"),
# 反向shell
(r"bash\s+-i\s+>\s*/dev/tcp/", "Reverse shell (bash)"),
(r"nc\s+-[el]", "Reverse shell (netcat)"),
(r"python.*-c.*socket", "Python reverse shell"),
# 数据破坏
(r">\s*/dev/sd[a-z]", "Direct disk write"),
(r"dd\s+if=.*of=/dev/", "Direct device write"),
# 篡改系统
(r"chmod\s+777\s+/etc", "Permission escalation"),
(r"sudo\s+su\s*$", "Privilege escalation"),
]
@classmethod
def validate(cls, command: str) -> ValidationResult:
"""
验证Shell命令安全性
Args:
command: 待验证的命令
Returns:
ValidationResult: 验证结果
"""
# 标准化命令
normalized = command.strip()
# 检查危险模式
for pattern, description in cls.DANGEROUS_PATTERNS:
if re.search(pattern, normalized, re.IGNORECASE):
return ValidationResult(
safe=False,
reason=description,
pattern=pattern,
)
# 检查命令长度限制
if len(normalized) > cls.MAX_COMMAND_LENGTH:
return ValidationResult(
safe=False,
reason=f"Command too long (max {cls.MAX_COMMAND_LENGTH})",
)
# 检查管道和重定向
if cls._has_suspicious_pipes(normalized):
return ValidationResult(
safe=False,
reason="Suspicious pipe patterns detected",
)
return ValidationResult(safe=True)
白名单机制
class ShellCommandValidator:
"""Shell命令验证器扩展白名单支持"""
# 白名单模式(允许的命令)
ALLOWED_PATTERNS = [
r"^git\s+", # Git命令
r"^pip\s+", # pip包管理
r"^npm\s+", # npm包管理
r"^curl\s+-s", # 安全的curl
r"^ls\s+", # 目录列表
r"^cat\s+", # 文件查看
r"^grep\s+", # 文本搜索
r"^find\s+", # 文件搜索
]
@classmethod
def validate_with_whitelist(
cls,
command: str,
enabled: bool = True,
) -> ValidationResult:
"""使用白名单验证"""
if not enabled:
return cls.validate(command)
# 检查是否匹配白名单
for pattern in cls.ALLOWED_PATTERNS:
if re.match(pattern, command):
# 白名单匹配,但仍需基础检查
return cls.validate(command)
# 不在白名单中,执行完整检查
result = cls.validate(command)
if not result.safe:
result.reason = f"Not in whitelist. {result.reason}"
return result
超时控制
class ShellCommandGuard:
"""Shell命令执行守卫"""
def __init__(self, default_timeout: float = 30.0):
self.default_timeout = default_timeout
self.timeout_map = {
"git": 60.0, # Git可能需要较长时间
"pip": 120.0, # pip安装可能需要较长时间
"npm": 120.0, # npm安装可能需要较长时间
"curl": 30.0,
}
def get_timeout(self, command: str) -> float:
"""根据命令类型返回合适的超时时间"""
for cmd_type, timeout in self.timeout_map.items():
if command.strip().startswith(cmd_type):
return timeout
return self.default_timeout
async def execute(
self,
command: str,
timeout: float | None = None,
) -> CommandResult:
"""安全执行Shell命令"""
# 验证命令
validation = ShellCommandValidator.validate(command)
if not validation.safe:
return CommandResult(
success=False,
error=f"Command blocked: {validation.reason}",
blocked=True,
)
# 获取超时时间
if timeout is None:
timeout = self.get_timeout(command)
# 执行命令(带超时控制)
try:
result = await asyncio.wait_for(
self._run_command(command),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
return CommandResult(
success=False,
error=f"Command timed out after {timeout}s",
timed_out=True,
)
文件访问控制
敏感路径保护
class FileAccessGuard:
"""文件访问守卫,限制对敏感路径的访问"""
# 敏感路径列表
SENSITIVE_PATHS = [
# SSH密钥
"~/.ssh",
"~/.ssh/*",
# 云凭据
"~/.aws",
"~/.gcloud",
"~/.azure",
# 加密货币
"~/.bitcoin",
"~/.ethereum",
# 系统配置
"/etc/shadow",
"/etc/sudoers",
"/etc/passwd",
# 其他
"~/.gnupg",
"~/.private",
]
# 访问模式
READ_PATTERNS = ["read", "view", "cat", "show"]
WRITE_PATTERNS = ["write", "edit", "create", "delete", "rm"]
@classmethod
def check_access(
cls,
path: str,
mode: str = "read",
) -> AccessCheckResult:
"""
检查文件访问权限
Args:
path: 文件路径
mode: 访问模式 (read/write)
Returns:
AccessCheckResult: 访问检查结果
"""
# 展开路径
expanded_path = os.path.expanduser(os.path.expandvars(path))
abs_path = os.path.abspath(expanded_path)
# 检查是否为敏感路径
for sensitive in cls.SENSITIVE_PATHS:
expanded_sensitive = os.path.abspath(
os.path.expanduser(os.path.expandvars(sensitive))
)
if abs_path.startswith(expanded_sensitive + os.sep):
return AccessCheckResult(
allowed=False,
reason=f"Access to sensitive path denied: {sensitive}",
requires_override=True,
)
# 检查写权限
if mode in cls.WRITE_PATTERNS:
if cls._is_protected_file(abs_path):
return AccessCheckResult(
allowed=False,
reason="Cannot modify protected file",
requires_override=True,
)
# 检查路径遍历攻击
if ".." in path:
return AccessCheckResult(
allowed=False,
reason="Path traversal detected",
)
return AccessCheckResult(allowed=True)
路径遍历检测
class FileAccessGuard:
"""文件访问守卫扩展:路径遍历检测"""
@classmethod
def _check_path_traversal(cls, path: str) -> bool:
"""检测路径遍历攻击"""
# 规范化路径
normalized = os.path.normpath(path)
# 检查".."模式
if ".." in normalized:
parts = normalized.split(os.sep)
for i, part in enumerate(parts):
if part == "..":
# 检查".."前是否有实际目录
before = parts[:i]
if before: # 如果有前置路径,检查是否越界
return True
# 检查symlink
try:
abs_path = os.path.abspath(path)
real_path = os.path.realpath(path)
if not real_path.startswith(os.getcwd()):
return True
except Exception:
pass
return False
技能安全扫描
扫描器实现
class SkillSecurityScanner:
"""技能安全扫描器"""
def __init__(self):
self.scanners = [
PromptInjectionScanner(),
CommandInjectionScanner(),
SecretScanner(),
DataExfiltrationScanner(),
]
async def scan(self, skill_dir: Path) -> ScanResult:
"""
扫描技能目录
Args:
skill_dir: 技能目录路径
Returns:
ScanResult: 扫描结果
"""
issues = []
# 扫描所有Python文件
for py_file in skill_dir.rglob("*.py"):
file_issues = await self._scan_file(py_file)
issues.extend(file_issues)
# 扫描配置文件
config_file = skill_dir / "config.json"
if config_file.exists():
config_issues = await self._scan_config(config_file)
issues.extend(config_issues)
return ScanResult(
skill_name=skill_dir.name,
issues=issues,
safe=len([i for i in issues if i.severity == "critical"]) == 0,
)
async def _scan_file(self, file_path: Path) -> list[SecurityIssue]:
"""扫描单个文件"""
issues = []
try:
content = file_path.read_text()
for scanner in self.scanners:
found = scanner.scan(content)
for match in found:
issues.append(SecurityIssue(
type=scanner.name,
severity=scanner.get_severity(match),
file=file_path.relative_to(file_path.parent.parent),
line=match.get("line", 0),
description=match.get("description", ""),
match=match.get("match", ""),
))
except Exception as e:
logger.warning(f"Failed to scan {file_path}: {e}")
return issues
提示词注入检测
class PromptInjectionScanner:
"""提示词注入检测器"""
SUSPICIOUS_PATTERNS = [
# 忽略指令
r"ignore\s+(all\s+)?previous\s+(instructions?|directions?)",
r"disregard\s+(your\s+)?(instructions?|system\s+prompt)",
r"forget\s+(everything|all\s+previous)\s+you\s+(were|told|learned)",
# 角色扮演攻击
r"you\s+are\s+now\s+",
r"pretend\s+you\s+are\s+",
r"act\s+as\s+",
# 指令覆盖
r"(instead|rather)\s+of\s+(following|obeying)",
r"(without|ignore)\s+(any|my)\s+(rules?|constraints?)",
]
def scan(self, content: str) -> list[dict]:
"""检测提示词注入"""
findings = []
for pattern in self.SUSPICIOUS_PATTERNS:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
findings.append({
"type": "prompt_injection",
"match": match.group(),
"line": content[:match.start()].count('\n') + 1,
"description": f"Potential prompt injection: {match.group()[:50]}",
})
return findings
硬编码密钥检测
class SecretScanner:
"""密钥和敏感信息检测器"""
SECRET_PATTERNS = {
"aws_access_key": r"AKIA[0-9A-Z]{16}",
"aws_secret_key": r"[A-Za-z0-9/+=]{40}",
"github_token": r"github[_-]?token[=-]?['\"]?([a-zA-Z0-9_]{35,40})['\"]?",
"openai_key": r"sk-[a-zA-Z0-9]{48}",
"slack_token": r"xox[baprs]-[0-9]{10,12}-[0-9]{10,12}[a-zA-Z0-9-]*",
"private_key": r"-----BEGIN\s+(RSA|DSA|EC|OPENSSH)\s+PRIVATE\s+KEY-----",
}
def scan(self, content: str) -> list[dict]:
"""检测硬编码密钥"""
findings = []
for secret_type, pattern in self.SECRET_PATTERNS.items():
matches = re.finditer(pattern, content)
for match in matches:
# 掩码显示
masked = self._mask_secret(match.group())
findings.append({
"type": "hardcoded_secret",
"secret_type": secret_type,
"match": masked,
"line": content[:match.start()].count('\n') + 1,
"description": f"Potential {secret_type} found",
})
return findings
@staticmethod
def _mask_secret(secret: str) -> str:
"""掩码密钥,只显示首尾字符"""
if len(secret) <= 8:
return "*" * len(secret)
return secret[:4] + "*" * (len(secret) - 8) + secret[-4:]
Web认证
API认证器
class APIAuthenticator:
"""API认证器"""
def __init__(self):
self.users: dict[str, User] = {}
self.tokens: dict[str, TokenInfo] = {}
self.token_expiry = 3600 # 1小时
async def authenticate(
self,
username: str,
password: str,
) -> str | None:
"""验证用户凭证,返回token"""
user = self.users.get(username)
if not user:
return None
if not self._verify_password(password, user.password_hash):
return None
# 生成token
token = self._generate_token()
self.tokens[token] = TokenInfo(
user_id=username,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(seconds=self.token_expiry),
)
return token
def verify_token(self, token: str) -> bool:
"""验证token有效性"""
if token not in self.tokens:
return False
token_info = self.tokens[token]
if datetime.now() > token_info.expires_at:
del self.tokens[token]
return False
return True
安全配置
配置项
# constant.py
class SecurityConfig:
"""安全配置"""
# Shell命令配置
SHELL_TIMEOUT_DEFAULT = 30.0 # 默认超时
SHELL_TIMEOUT_MAX = 300.0 # 最大超时
ALLOW_WHITELIST_ONLY = False # 是否仅允许白名单命令
# 文件访问配置
ALLOWED_FILE_EXTENSIONS = [
".txt", ".md", ".py", ".js", ".json",
".csv", ".xml", ".yaml", ".yml",
]
BLOCKED_FILE_EXTENSIONS = [
".exe", ".sh", ".bat", ".ps1",
]
# 技能安全
SKILLS_SCAN_REQUIRED = True # 安装前必须扫描
SKILLS_ALLOW_NETWORK = True # 允许网络请求
总结
QwenPaw的安全防护体系设计原则:
通过这套完善的安全防护体系,QwenPaw在提供强大功能的同时,也能有效保护用户的数据和系统安全。
往期回顾:
下期预告:
如果对你有帮助,欢迎点赞、在看!
夜雨聆风