开篇:
Evasion SubAgents 是一个用来做免杀实验的工具箱,通过它来学习免杀skill的编写
项目架构总览

一、四层架构设计:从命令到知识的完整链路
这个项目的本质不是传统意义上的独立程序,而是一套元工作流系统:
evasion-agent-teams/├── .claude-plugin/│ └── plugin.json # 插件元数据配置├── agents/ # 代理层:定义角色边界│ ├── research-agent.md # 技术研究代理│ ├── loadergen-agent.md # Loader生成代理│ ├── evasion-agent.md # 免杀集成代理│ └── c2-evasion-agent.md # C2免杀代理(最成熟)├── skills/ # 技能层:定义执行流程│ ├── research/SKILL.md # 搜索分析技能│ ├── loader_generate/SKILL.md # Loader生成技能│ ├── evasion_integrate/SKILL.md # 免杀集成技能│ └── c2_evasion/SKILL.md # C2免杀技能├── commands/ # 命令层:用户入口│ ├── research.md│ ├── loader_generate.md│ ├── evasion_integrate.md│ └── c2_evasion.md├── lib/│ └── knowledge_manager.py # 知识库管理器(核心代码)├── knowledge-base/ # 知识库层:结构化存储│ ├── evasion_techniques.json # 免杀技术库│ ├── loader_techniques.json # Loader组件库│ └── scenarios.json # 已验证场景库├── hooks/│ └── langfuse_hook.py # 可观测性钩子└── output/ # 输出目录二、三层抽象:Agent/Command/Skill的职责分离
2.1 Agent层
Agent层的核心是角色边界和权限控制。以c2-evasion-agent.md为例:
# C2 Evasion Agent## 角色定位你是专门负责C2框架源码分析和免杀修改的安全专家。## 权限范围允许:- 直接修改源码文件- 读取YARA/Sigma规则- 分析二进制特征- 执行编译验证禁止:- 执行恶意代码- 破坏性修改- 未经分析直接修改## 执行原则1. 先找规则,再决定改什么2. 逐条分析,动态生成任务3. 优先编译选项,后考虑源码重构4. 每条规则产出独立分析文档关键设计点:Agent不是提供知识,而是定义人格边界和动作权限。
2.2 Command层
Command层将复杂功能封装成产品化的命令入口。以loader_generate.md为例:
# Command: loader_generate## 功能描述从loader知识库生成shellcode loader## 参数说明- [count] 生成数量(默认1)- --shellcode <path> 指定shellcode文件- --executor <type> 执行方式(callback/createRemoteThread等)- --complexity <level> 复杂度(simple/medium/complex)- --language <lang> 编程语言(c/rust/go)## 输出结构output/└── loader_<timestamp>/ ├── source/ # 生成的源码 ├── build/ # 编译产物 └── metadata.json # 生成参数记录## 调用Agentloadergen-agent设计优势:用户接口与内部逻辑解耦,命令文档即产品帮助页。
2.3 Skill层
Skill层是真正的执行工艺说明书。以c2_evasion/SKILL.md为例:
# Skill: C2 Evasion## 执行流程### Phase 1: 识别C2组件1. 扫描项目目录结构2. 识别网络通信模块3. 标记核心功能文件输出:c2_components.md### Phase 2: 搜索检测规则1. 搜索YARA规则(GitHub/官方仓库)2. 搜索Sigma规则3. 搜索网络检测规则输出:detection_rules/目录### Phase 3: 逐条规则分析对每条检测规则:1. 提取检测特征2. 定位源码对应位置3. 分析修改可行性4. 生成修改建议输出:rule_analysis/<rule_name>.md### Phase 4: 二进制资产分析1. 分析十六进制模式2. 识别硬编码字符串3. 查找API调用序列输出:binary_analysis.md### Phase 5: 敏感字符串搜索1. 搜索IOC特征2. 搜索签名相关3. 搜索调试特征输出:sensitive_strings.md### Phase 6: 修改和验证1. 根据分析生成修改任务2. 实施源码修改3. 编译验证4. 功能测试输出:modified_source/ + build_results.md核心价值:将复杂任务拆解为可追溯、可审计的阶段化流程。
三、知识库设计:从碎片化到结构化
3.1 知识库结构设计
知识库不是存储大段文本,而是将知识原子化、组件化:
免杀技术库
{"techniques": [ {"id": "T001","name": "API Hashing","evasion_type": "api_obfuscation","description": "通过哈希值动态解析API,避免导入表暴露敏感函数","code_template": "/* C语言API哈希实现模板 */","apis": ["LoadLibrary", "GetProcAddress"],"complexity": "medium","references": ["GitHub链接"],"last_updated": "2025-01-01" } ]}Evasion类型分类:
api_obfuscation- API混淆 string_obfuscation- 字符串混淆 memory_evasion- 内存规避 execution_evasion- 执行规避 anti_analysis- 反分析 amsi_etw_bypass- AMSI/ETW绕过 unhooking- 脱钩
Loader组件库
{"storage_methods": [ {"id": "sto_001","name": "File Embedding","description": "将shellcode嵌入资源段","code_snippet": "/* 资源段嵌入代码 */" } ],"memory_allocators": [ {"id": "mem_001","name": "VirtualAlloc","description": "使用VirtualAlloc分配可执行内存","apis": ["VirtualAlloc", "VirtualProtect"] } ],"data_copiers": [ {"id": "cpy_001","name": "RtlMoveMemory","description": "使用RtlMoveMemory复制shellcode" } ],"executors": [ {"id": "exe_001","name": "CreateThread","description": "使用CreateThread执行shellcode" } ]}设计精髓:将完整Loader拆解为可组合的原子组件,实现:
可复用性 可筛选性 可组合性 可统计性
3.2 知识治理的核心
lib/knowledge_manager.py是项目中最接近传统程序的部分:
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""知识库管理器 - 负责知识的增删改查、去重、组合和导出"""import jsonimport osfrom typing import Dict, List, Optional, Anyfrom difflib import SequenceMatcherfrom pathlib import PathclassKnowledgeManager: """知识库管理器类"""def__init__(self, base_dir: str = "knowledge-base"): """ 初始化知识库管理器Args:base_dir: 知识库基础目录路径 """self.base_dir = Path(base_dir)self.evasion_file = self.base_dir / "evasion_techniques.json"self.loader_file = self.base_dir / "loader_techniques.json"self.scenarios_file = self.base_dir / "scenarios.json" # 初始化知识库文件(如果不存在)self._initialize_knowledge_bases()def_initialize_knowledge_bases(self): """初始化知识库文件,创建默认schema"""default_schemas = {self.evasion_file: {"techniques": []},self.loader_file: {"storage_methods": [],"memory_allocators": [],"data_copiers": [],"executors": [] },self.scenarios_file: {"scenarios": []} }for file_path, schema in default_schemas.items():if not file_path.exists(): file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(schema, f, indent=2, ensure_ascii=False) def add_evasion_technique(self, technique: Dict[str, Any]) -> bool:""" 添加免杀技术到知识库 Args: technique: 技术字典,包含id, name, evasion_type等字段 Returns: bool: 添加成功返回True """# 检查是否重复(基于相似度)ifself._is_duplicate_technique(technique):print(f"注意: 检测到相似技术: {technique['name']}")returnFalse# 读取现有数据 with open(self.evasion_file, 'r', encoding='utf-8') as f: data = json.load(f)# 添加新技术 data["techniques"].append(technique)# 写回文件 with open(self.evasion_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False)print(f"已添加技术: {technique['name']}")returnTrue def _is_duplicate_technique(self, new_technique: Dict) -> bool:""" 检测技术是否重复(多维度相似度检查) 检查维度: 1. 名称相似度 2. 关键词重叠度 3. API重叠度 4. 来源重叠 Args: new_technique: 新技术字典 Returns: bool: 如果重复返回True """ with open(self.evasion_file, 'r', encoding='utf-8') as f: data = json.load(f)for existing in data["techniques"]: similarity_score = 0# 1. 名称相似度检查 name_similarity = SequenceMatcher( None, new_technique['name'].lower(), existing['name'].lower() ).ratio()if name_similarity > 0.9:returnTrue similarity_score += name_similarity * 0.3# 2. 关键词重叠检查if'keywords' in new_technique and'keywords' in existing: keyword_overlap = len( set(new_technique['keywords']) & set(existing['keywords']) ) / max(len(new_technique['keywords']), 1) similarity_score += keyword_overlap * 0.3# 3. API重叠检查if'apis' in new_technique and'apis' in existing: api_overlap = len( set(new_technique['apis']) & set(existing['apis']) ) / max(len(new_technique['apis']), 1) similarity_score += api_overlap * 0.3# 4. 来源重叠检查if'source' in new_technique and'source' in existing:if new_technique['source'] == existing['source']: similarity_score += 0.1# 如果综合相似度超过阈值,视为重复if similarity_score > 0.7:returnTruereturnFalse def get_loader_components(self, component_type: str) -> List[Dict]:""" 获取指定类型的Loader组件 Args: component_type: 组件类型 (storage_methods/memory_allocators/data_copiers/executors) Returns: List[Dict]: 组件列表 """ with open(self.loader_file, 'r', encoding='utf-8') as f: data = json.load(f)return data.get(component_type, []) def generate_random_loader(self, count: int = 1) -> List[Dict]:""" 随机生成Loader组合 Args: count: 生成数量 Returns: List[Dict]: Loader配置列表 """ import random loaders = []for _ in range(count): loader = {"storage": random.choice(self.get_loader_components("storage_methods")),"allocator": random.choice(self.get_loader_components("memory_allocators")),"copier": random.choice(self.get_loader_components("data_copiers")),"executor": random.choice(self.get_loader_components("executors")) } loaders.append(loader)return loaders def record_scenario(self, scenario: Dict[str, Any]):""" 记录已验证的场景组合 Args: scenario: 场景字典,包含components, result, timestamp等 """ with open(self.scenarios_file, 'r', encoding='utf-8') as f: data = json.load(f) data["scenarios"].append(scenario) with open(self.scenarios_file, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) def get_stats(self) -> Dict[str, Any]:""" 获取知识库统计信息 Returns: Dict: 统计数据 """ stats = {}# 统计免杀技术 with open(self.evasion_file, 'r', encoding='utf-8') as f: data = json.load(f) stats["evasion_techniques"] = len(data["techniques"])# 按类型统计 type_count = {}for tech in data["techniques"]: tech_type = tech.get("evasion_type", "unknown") type_count[tech_type] = type_count.get(tech_type, 0) + 1 stats["by_type"] = type_count# 统计Loader组件 with open(self.loader_file, 'r', encoding='utf-8') as f: data = json.load(f) stats["loader_components"] = {"storage_methods": len(data.get("storage_methods", [])),"memory_allocators": len(data.get("memory_allocators", [])),"data_copiers": len(data.get("data_copiers", [])),"executors": len(data.get("executors", [])) }# 统计场景 with open(self.scenarios_file, 'r', encoding='utf-8') as f: data = json.load(f) stats["verified_scenarios"] = len(data.get("scenarios", []))return stats关键功能亮点:
多维度去重逻辑:不仅检查名称完全匹配,还计算名称相似度、关键词重叠、API重叠、来源重叠 结构化查询:支持按类型、复杂度、API等维度筛选 场景记录:记录已验证的组合,避免重复劳动 统计分析:提供知识库使用情况统计
设计哲学:安全知识库最怕的不是少,而是越积越乱。
四、规则驱动的修改流程
4.1 六阶段处理流程

整个项目最成熟、最像真正工作流系统的部分。
4.2 核心设计原则
## C2 Evasion Agent 核心原则### 禁止行为- 在分析前预先建立任务- 在未看到规则前假设应该改什么- 凭感觉乱改源码- 一上来就重构源码### 推荐做法1. 先搜索检测规则(YARA/Sigma/Network)2. 逐条规则分析检测特征3. 定位源码对应位置4. 优先考虑编译选项和构建配置5. 最后才修改源码6. 每条规则产出独立分析文档这背后的思维:将高不确定性任务改造成证据驱动、步骤可追溯、输出可审计的流程。
Phase 3示例:逐条规则分析
# 伪代码示例:规则分析流程def analyze_detection_rule(rule_file: str) -> Dict:""" 分析单条检测规则 步骤: 1. 解析规则文件(YARA/Sigma) 2. 提取检测特征(字符串/正则/API序列) 3. 在源码中搜索匹配位置 4. 评估修改难度和影响 5. 生成修改建议 返回: { "rule_name": "CobaltStrike_Beacon", "detection_features": [ {"type": "string", "value": "\\x5c\\x4d\\x5c\\x52"}, {"type": "api", "value": "InternetOpenA"} ], "source_locations": [ {"file": "beacon.c", "line": 123, "feature": "M\\R字符串"} ], "modification_suggestions": [ {"type": "string_encryption", "priority": "high"}, {"type": "api_dynamic_resolve", "priority": "medium"} ] } """ pass五、可观测性的保障
5.1 Tracing
多代理系统面临的核心挑战:调不通、查不到、复盘不了。
hooks/langfuse_hook.py解决了官方对subagent调用链支持不足的问题:
#!/usr/bin/env python3# -*- coding: utf-8 -*-"""Langfuse Hook - 为Claude Code多代理系统提供可观测性功能:1. 捕获主会话和子代理调用2. 记录tool_use和tool_result3. 生成trace上传到Langfuse4. 支持本地状态管理和失败恢复"""import jsonimport hashlibimport fcntlfrom pathlib import Pathfrom langfuse import Langfusefrom typing import Dict, Any, OptionalclassLangfuseHook: """Langfuse钩子类"""def__init__(self, project_dir: str = "."): """ 初始化Langfuse钩子Args:project_dir: 项目目录 """self.project_dir = Path(project_dir)self.state_file = self.project_dir / ".langfuse_state.json"self.lock_file = self.project_dir / ".langfuse.lock" # 初始化Langfuse客户端self.langfuse = Langfuse(public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),secret_key=os.getenv("LANGFUSE_SECRET_KEY"),host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com") )defprocess_transcript(self, transcript_path: str): """ 处理ClaudeCode对话记录 步骤: 1. 读取增量内容(基于上次处理位置) 2. 识别消息类型: - 主会话消息 - tool_use事件 - tool_result事件 - subagent调用事件 3. 生成trace并上传 4. 更新本地状态Args:transcript_path: 对话记录文件路径 """ # 获取文件锁(防止并发冲突)withopen(self.lock_file, 'w') aslock:fcntl.flock(lock.fileno(), fcntl.LOCK_EX)try: # 读取上次状态last_position = self._read_state() # 读取增量内容withopen(transcript_path, 'r') asf:f.seek(last_position)new_content = f.read()current_position = f.tell() # 解析新增消息messages = self._parse_messages(new_content) # 处理每条消息formsginmessages:ifmsg['type'] == 'subagent_call':self._trace_subagent_call(msg)elifmsg['type'] == 'tool_use':self._trace_tool_use(msg)elifmsg['type'] == 'tool_result':self._trace_tool_result(msg) # 更新状态self._update_state(current_position)finally:fcntl.flock(lock.fileno(), fcntl.LOCK_UN)def_trace_subagent_call(self, msg: Dict[str, Any]): """ 记录子代理调用Args:msg: 子代理调用消息 """ # 生成traceID(基于内容哈希)trace_id = hashlib.sha256(json.dumps(msg, sort_keys=True).encode() ).hexdigest()[:16] # 创建Langfusetracetrace = self.langfuse.trace(id=trace_id,name=f"SubAgent: {msg['agent_name']}", input=msg['prompt'], metadata={ "agent_type": msg['agent_name'], "tools_used": msg.get('tools', []), "duration": msg.get('duration_ms') } ) # 记录span(子代理执行) span = trace.span( name="subagent_execution", input=msg['prompt'], output=msg.get('result') ) # 结束span span.end() def _truncate_large_content(self, content: str, max_length: int = 10000): """ 截断大内容(防止超出Langfuse限制) Args: content: 原始内容 max_length: 最大长度 Returns: str: 截断后的内容""" if len(content) > max_length: return content[:max_length] + f"... (truncated, total {len(content)} chars)" return content def fail_open(self, error: Exception): """ 失败开放策略:记录错误但不中断主流程 Args:error: 异常对象""" print(f"注意: Langfuse hook error (non-blocking): {error}") # 不抛出异常,保证主流程继续关键设计点:
本地状态管理:记录上次处理位置,支持增量处理 文件锁机制:防止并发冲突 截断和哈希:处理大内容,生成唯一trace ID Fail-Open策略:监控失败不影响主流程
工程意义:一旦系统涉及多代理、长链路、外部搜索、知识库写入、源码修改,没有tracing几乎不可能稳定维护。
六、暴露的问题与改进方向
6.1 问题一:产品逻辑仍停留在Prompt层
README看起来强大,但真正落到程序层面的主要是:
知识库管理器 Tracing钩子
而关键能力仍依赖Claude Code"照着Prompt自己做":
如何真正生成C/C++/Rust代码 如何真正执行编译流水线 如何将知识库组合映射到源码模板 如何验证输出产物一致性
本质:更像带强约束的人工编排框架,而非deterministic pipeline。
6.2 问题二:知识库Schema漂移
// 问题示例:字段命名不统一{"techniques": [ {"id": "T001","evasion_type": "api_obfuscation", // 有的用这个"type": "api_obfuscation", // 有的用这个(重复) ... } ],"loader_components": {"storage_methods": [ {"id": "embedded", ...}, // 有的用描述性ID {"id": "sto_009", ...}, // 有的用编号ID ... ] }}长期风险:Schema漂移会导致Agent对字段理解依赖上下文猜测,而非明确约定。
6.3 问题三:产品化超前于工程收口
README写得像成熟产品,但实际稳定存在的只有:
.claude配置层 JSON知识库 Python管理脚本 Langfuse钩子
差距:离"真正稳固的插件产品"还有距离。
七、四大学习点

7.1 用三层抽象管理Prompt系统
┌─────────────────────────────────────────┐│ Commands(用户入口) ││ - 产品化命令接口 │└─────────────────────────────────────────┘ ↓┌─────────────────────────────────────────┐│ Agents(角色定义) ││ - 人格边界 + 权限控制 │└─────────────────────────────────────────┘ ↓┌─────────────────────────────────────────┐│ Skills(执行工艺) ││ - 分阶段SOP + 输出规范 │└─────────────────────────────────────────┘可迁移场景:代码审计、应急响应、威胁狩猎、自动化报告。
7.2 把易碎知识固化成结构化知识库
反模式:知识飘在对话上下文里,会话结束一切归零。
正模式:
// 知识库沉淀{"techniques": [...], // 发现的模式"components": [...], // 提取的组件"scenarios": [...] // 验证的组合}7.3 让高风险任务变成阶段化流程
C2 Evasion的Phase设计本质:将高不确定性智能体任务改造成接近流程引擎模式。
不确定性任务 → 分阶段拆解 → 每阶段明确输入输出 → 可审计追溯
7.4 考虑Tracing

常见错误路径:
在多数系统生命周期中,问题往往出现在这样的循环里:功能开发完成立即上线,初期运行一切正常;直到出现异常或故障时,团队才发现缺乏足够的监控与观测数据,无法快速定位根因。由于事后补充监控涉及代码改动、环境部署和数据回填,过程既耗时又复杂,最终导致问题解决滞后、排障成本居高不下。
正确路径(本项目):
在系统的设计阶段就预先集成 Tracing 能力,是构建可观测性体系的关键起点。将追踪逻辑嵌入架构设计中,而不是事后补充,可在后续开发和上线阶段自然沉淀全链路数据。当应用进入生产环境时,每一次调用、延迟与异常都能被完整记录和关联。这样,当出现性能瓶颈或故障时,团队无需重现现场,就能依靠 Tracing 信息快速回溯调用路径、定位根因,实现真正的“问题可追溯、治理可量化”。
总结:
Evasion SubAgents项目的真正价值不在于某个具体规避技巧,而在于它已经在尝试回答一个更大的问题:
当安全研究开始被Agent化之后,我们究竟应该如何组织Prompt、知识、工具和执行记录?





夜雨聆风