乐于分享
好东西不私藏

QwenPaw源码解析系列(六):安全防护体系深度解析

QwenPaw源码解析系列(六):安全防护体系深度解析

前言

作为个人AI助手,安全是QwenPaw最重视的方面之一。本篇文章将深入解析QwenPaw的多层安全防护体系,包括ToolGuardMixin、命令守卫、文件访问控制等核心机制。

安全防护架构概览

QwenPaw采用了纵深防御的安全架构:

┌─────────────────────────────────────────────────────────┐
│ 安全防护层 │
├─────────────────────────────────────────────────────────┤
│ Layer 1: 工具守卫 (ToolGuardMixin) │
│ Layer 2: 命令白名单 (ShellCommandValidator) │
│ Layer 3: 文件访问控制 (FileAccessGuard) │
│ Layer 4: 技能安全扫描 (SkillSecurityScanner) │
│ Layer 5: API认证 (APIAuthenticator) │
└─────────────────────────────────────────────────────────┘

ToolGuardMixin:工具守卫

Mixin实现原理

class ToolGuardMixin:
"""
工具守卫混入类,拦截所有工具调用进行安全检查

通过Python的MRO(方法解析顺序)实现:
QwenPawAgent → ToolGuardMixin → ReActAgent
"""


async def _acting(self, tool_call: dict) -> dict | None:
"""
拦截工具调用,执行安全检查

Args:
tool_call: 工具调用请求

Returns:
None表示拦截(阻止执行),返回dict表示放行
"""

tool_name = tool_call.get("name", "")
tool_args = tool_call.get("arguments", {})

# 1. 执行安全检查
check_result = await self._security_check(tool_name, tool_args)

if not check_result.allowed:
logger.warning(
f"Tool '{tool_name}' blocked by security policy: "
f"{check_result.reason}"
)
return self._blocked_response(tool_call, check_result.reason)

# 2. 通过检查,传递给父类执行
return await super()._acting(tool_call)

安全检查流程

async def _security_check(
self,
tool_name: str,
tool_args: dict,
) -> SecurityCheckResult:
"""执行安全检查"""

# 1. 检查工具是否在允许列表中
if not self._is_tool_allowed(tool_name):
return SecurityCheckResult(
allowed=False,
reason=f"Tool '{tool_name}' is not in the allowed list"
)

# 2. 工具特定的参数检查
if tool_name == "execute_shell_command":
result = await self._check_shell_command(tool_args)
if not result.allowed:
return result

# 3. 文件访问检查
if tool_name in ["read_file", "write_file", "edit_file"]:
result = self._check_file_access(tool_args.get("path", ""))
if not result.allowed:
return result

# 4. 浏览器操作检查
if tool_name == "browser_use":
result = self._check_browser_action(tool_args)
if not result.allowed:
return result

return SecurityCheckResult(allowed=True)

Shell命令守卫

命令验证器

class ShellCommandValidator:
"""Shell命令验证器,检测危险命令模式"""

# 危险命令模式
DANGEROUS_PATTERNS = [
# 递归删除
(r"rm\s+-rf\s+/", "Recursive root delete"),
(r"rm\s+-rf\s+\*\s*$", "Recursive delete in root"),

# Fork炸弹
(r":\(\)\{\s*:\|:\s*&\s*\};:\|:&\s*&", "Fork bomb"),
(r"fork\(\)\s*\{", "Fork pattern"),

# 反向shell
(r"bash\s+-i\s+>\s*/dev/tcp/", "Reverse shell (bash)"),
(r"nc\s+-[el]", "Reverse shell (netcat)"),
(r"python.*-c.*socket", "Python reverse shell"),

# 数据破坏
(r">\s*/dev/sd[a-z]", "Direct disk write"),
(r"dd\s+if=.*of=/dev/", "Direct device write"),

# 篡改系统
(r"chmod\s+777\s+/etc", "Permission escalation"),
(r"sudo\s+su\s*$", "Privilege escalation"),
]

@classmethod
def validate(cls, command: str) -> ValidationResult:
"""
验证Shell命令安全性

Args:
command: 待验证的命令

Returns:
ValidationResult: 验证结果
"""

# 标准化命令
normalized = command.strip()

# 检查危险模式
for pattern, description in cls.DANGEROUS_PATTERNS:
if re.search(pattern, normalized, re.IGNORECASE):
return ValidationResult(
safe=False,
reason=description,
pattern=pattern,
)

# 检查命令长度限制
if len(normalized) > cls.MAX_COMMAND_LENGTH:
return ValidationResult(
safe=False,
reason=f"Command too long (max {cls.MAX_COMMAND_LENGTH})",
)

# 检查管道和重定向
if cls._has_suspicious_pipes(normalized):
return ValidationResult(
safe=False,
reason="Suspicious pipe patterns detected",
)

return ValidationResult(safe=True)

白名单机制

class ShellCommandValidator:
"""Shell命令验证器扩展白名单支持"""

# 白名单模式(允许的命令)
ALLOWED_PATTERNS = [
r"^git\s+", # Git命令
r"^pip\s+", # pip包管理
r"^npm\s+", # npm包管理
r"^curl\s+-s", # 安全的curl
r"^ls\s+", # 目录列表
r"^cat\s+", # 文件查看
r"^grep\s+", # 文本搜索
r"^find\s+", # 文件搜索
]

@classmethod
def validate_with_whitelist(
cls,
command: str,
enabled: bool = True,
) -> ValidationResult:
"""使用白名单验证"""
if not enabled:
return cls.validate(command)

# 检查是否匹配白名单
for pattern in cls.ALLOWED_PATTERNS:
if re.match(pattern, command):
# 白名单匹配,但仍需基础检查
return cls.validate(command)

# 不在白名单中,执行完整检查
result = cls.validate(command)
if not result.safe:
result.reason = f"Not in whitelist. {result.reason}"

return result

超时控制

class ShellCommandGuard:
"""Shell命令执行守卫"""

def __init__(self, default_timeout: float = 30.0):
self.default_timeout = default_timeout
self.timeout_map = {
"git": 60.0, # Git可能需要较长时间
"pip": 120.0, # pip安装可能需要较长时间
"npm": 120.0, # npm安装可能需要较长时间
"curl": 30.0,
}

def get_timeout(self, command: str) -> float:
"""根据命令类型返回合适的超时时间"""
for cmd_type, timeout in self.timeout_map.items():
if command.strip().startswith(cmd_type):
return timeout
return self.default_timeout

async def execute(
self,
command: str,
timeout: float | None = None,
) -> CommandResult:
"""安全执行Shell命令"""
# 验证命令
validation = ShellCommandValidator.validate(command)
if not validation.safe:
return CommandResult(
success=False,
error=f"Command blocked: {validation.reason}",
blocked=True,
)

# 获取超时时间
if timeout is None:
timeout = self.get_timeout(command)

# 执行命令(带超时控制)
try:
result = await asyncio.wait_for(
self._run_command(command),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
return CommandResult(
success=False,
error=f"Command timed out after {timeout}s",
timed_out=True,
)

文件访问控制

敏感路径保护

class FileAccessGuard:
"""文件访问守卫,限制对敏感路径的访问"""

# 敏感路径列表
SENSITIVE_PATHS = [
# SSH密钥
"~/.ssh",
"~/.ssh/*",
# 云凭据
"~/.aws",
"~/.gcloud",
"~/.azure",
# 加密货币
"~/.bitcoin",
"~/.ethereum",
# 系统配置
"/etc/shadow",
"/etc/sudoers",
"/etc/passwd",
# 其他
"~/.gnupg",
"~/.private",
]

# 访问模式
READ_PATTERNS = ["read", "view", "cat", "show"]
WRITE_PATTERNS = ["write", "edit", "create", "delete", "rm"]

@classmethod
def check_access(
cls,
path: str,
mode: str = "read",
) -> AccessCheckResult:
"""
检查文件访问权限

Args:
path: 文件路径
mode: 访问模式 (read/write)

Returns:
AccessCheckResult: 访问检查结果
"""

# 展开路径
expanded_path = os.path.expanduser(os.path.expandvars(path))
abs_path = os.path.abspath(expanded_path)

# 检查是否为敏感路径
for sensitive in cls.SENSITIVE_PATHS:
expanded_sensitive = os.path.abspath(
os.path.expanduser(os.path.expandvars(sensitive))
)
if abs_path.startswith(expanded_sensitive + os.sep):
return AccessCheckResult(
allowed=False,
reason=f"Access to sensitive path denied: {sensitive}",
requires_override=True,
)

# 检查写权限
if mode in cls.WRITE_PATTERNS:
if cls._is_protected_file(abs_path):
return AccessCheckResult(
allowed=False,
reason="Cannot modify protected file",
requires_override=True,
)

# 检查路径遍历攻击
if ".." in path:
return AccessCheckResult(
allowed=False,
reason="Path traversal detected",
)

return AccessCheckResult(allowed=True)

路径遍历检测

class FileAccessGuard:
"""文件访问守卫扩展:路径遍历检测"""

@classmethod
def _check_path_traversal(cls, path: str) -> bool:
"""检测路径遍历攻击"""
# 规范化路径
normalized = os.path.normpath(path)

# 检查".."模式
if ".." in normalized:
parts = normalized.split(os.sep)
for i, part in enumerate(parts):
if part == "..":
# 检查".."前是否有实际目录
before = parts[:i]
if before: # 如果有前置路径,检查是否越界
return True

# 检查symlink
try:
abs_path = os.path.abspath(path)
real_path = os.path.realpath(path)
if not real_path.startswith(os.getcwd()):
return True
except Exception:
pass

return False

技能安全扫描

扫描器实现

class SkillSecurityScanner:
"""技能安全扫描器"""

def __init__(self):
self.scanners = [
PromptInjectionScanner(),
CommandInjectionScanner(),
SecretScanner(),
DataExfiltrationScanner(),
]

async def scan(self, skill_dir: Path) -> ScanResult:
"""
扫描技能目录

Args:
skill_dir: 技能目录路径

Returns:
ScanResult: 扫描结果
"""

issues = []

# 扫描所有Python文件
for py_file in skill_dir.rglob("*.py"):
file_issues = await self._scan_file(py_file)
issues.extend(file_issues)

# 扫描配置文件
config_file = skill_dir / "config.json"
if config_file.exists():
config_issues = await self._scan_config(config_file)
issues.extend(config_issues)

return ScanResult(
skill_name=skill_dir.name,
issues=issues,
safe=len([i for i in issues if i.severity == "critical"]) == 0,
)

async def _scan_file(self, file_path: Path) -> list[SecurityIssue]:
"""扫描单个文件"""
issues = []

try:
content = file_path.read_text()

for scanner in self.scanners:
found = scanner.scan(content)
for match in found:
issues.append(SecurityIssue(
type=scanner.name,
severity=scanner.get_severity(match),
file=file_path.relative_to(file_path.parent.parent),
line=match.get("line", 0),
description=match.get("description", ""),
match=match.get("match", ""),
))

except Exception as e:
logger.warning(f"Failed to scan {file_path}: {e}")

return issues

提示词注入检测

class PromptInjectionScanner:
"""提示词注入检测器"""

SUSPICIOUS_PATTERNS = [
# 忽略指令
r"ignore\s+(all\s+)?previous\s+(instructions?|directions?)",
r"disregard\s+(your\s+)?(instructions?|system\s+prompt)",
r"forget\s+(everything|all\s+previous)\s+you\s+(were|told|learned)",

# 角色扮演攻击
r"you\s+are\s+now\s+",
r"pretend\s+you\s+are\s+",
r"act\s+as\s+",

# 指令覆盖
r"(instead|rather)\s+of\s+(following|obeying)",
r"(without|ignore)\s+(any|my)\s+(rules?|constraints?)",
]

def scan(self, content: str) -> list[dict]:
"""检测提示词注入"""
findings = []

for pattern in self.SUSPICIOUS_PATTERNS:
matches = re.finditer(pattern, content, re.IGNORECASE)
for match in matches:
findings.append({
"type": "prompt_injection",
"match": match.group(),
"line": content[:match.start()].count('\n') + 1,
"description": f"Potential prompt injection: {match.group()[:50]}",
})

return findings

硬编码密钥检测

class SecretScanner:
"""密钥和敏感信息检测器"""

SECRET_PATTERNS = {
"aws_access_key": r"AKIA[0-9A-Z]{16}",
"aws_secret_key": r"[A-Za-z0-9/+=]{40}",
"github_token": r"github[_-]?token[=-]?['\"]?([a-zA-Z0-9_]{35,40})['\"]?",
"openai_key": r"sk-[a-zA-Z0-9]{48}",
"slack_token": r"xox[baprs]-[0-9]{10,12}-[0-9]{10,12}[a-zA-Z0-9-]*",
"private_key": r"-----BEGIN\s+(RSA|DSA|EC|OPENSSH)\s+PRIVATE\s+KEY-----",
}

def scan(self, content: str) -> list[dict]:
"""检测硬编码密钥"""
findings = []

for secret_type, pattern in self.SECRET_PATTERNS.items():
matches = re.finditer(pattern, content)
for match in matches:
# 掩码显示
masked = self._mask_secret(match.group())

findings.append({
"type": "hardcoded_secret",
"secret_type": secret_type,
"match": masked,
"line": content[:match.start()].count('\n') + 1,
"description": f"Potential {secret_type} found",
})

return findings

@staticmethod
def _mask_secret(secret: str) -> str:
"""掩码密钥,只显示首尾字符"""
if len(secret) <= 8:
return "*" * len(secret)
return secret[:4] + "*" * (len(secret) - 8) + secret[-4:]

Web认证

API认证器

class APIAuthenticator:
"""API认证器"""

def __init__(self):
self.users: dict[str, User] = {}
self.tokens: dict[str, TokenInfo] = {}
self.token_expiry = 3600 # 1小时

async def authenticate(
self,
username: str,
password: str,
) -> str | None:
"""验证用户凭证,返回token"""
user = self.users.get(username)
if not user:
return None

if not self._verify_password(password, user.password_hash):
return None

# 生成token
token = self._generate_token()
self.tokens[token] = TokenInfo(
user_id=username,
created_at=datetime.now(),
expires_at=datetime.now() + timedelta(seconds=self.token_expiry),
)

return token

def verify_token(self, token: str) -> bool:
"""验证token有效性"""
if token not in self.tokens:
return False

token_info = self.tokens[token]
if datetime.now() > token_info.expires_at:
del self.tokens[token]
return False

return True

安全配置

配置项

# constant.py
class SecurityConfig:
"""安全配置"""

# Shell命令配置
SHELL_TIMEOUT_DEFAULT = 30.0 # 默认超时
SHELL_TIMEOUT_MAX = 300.0 # 最大超时
ALLOW_WHITELIST_ONLY = False # 是否仅允许白名单命令

# 文件访问配置
ALLOWED_FILE_EXTENSIONS = [
".txt", ".md", ".py", ".js", ".json",
".csv", ".xml", ".yaml", ".yml",
]
BLOCKED_FILE_EXTENSIONS = [
".exe", ".sh", ".bat", ".ps1",
]

# 技能安全
SKILLS_SCAN_REQUIRED = True # 安装前必须扫描
SKILLS_ALLOW_NETWORK = True # 允许网络请求

总结

QwenPaw的安全防护体系设计原则:

1.
纵深防御:多层安全检查,即使一层被绕过也有其他层保护
2.
默认拒绝:未明确允许的都被视为禁止
3.
最小权限:只授予完成任务所需的最小权限
4.
可审计:所有操作都有日志记录
5.
可配置:安全策略可根据需要调整

通过这套完善的安全防护体系,QwenPaw在提供强大功能的同时,也能有效保护用户的数据和系统安全。


往期回顾:

系列一:项目概览与整体架构
系列二:QwenPawAgent核心实现
系列三:记忆系统设计与实现
系列四:Skills系统与扩展机制
系列五:ACP协议与多智能体通信

下期预告:

配置系统:AgentProfile与工作目录
控制台:Web界面与API
部署与扩展:Docker与云端部署

如果对你有帮助,欢迎点赞在看