相关的系列文章:
lead_agent 模块是 DeerFlow 的主代理引擎,负责根据用户请求创建、配置并返回一个可执行的 LangGraph Agent,具体的代码位置:deer-flow\backend\packages\harness\deerflow\agents\lead_agent
本文是从源码中梳理出和记忆相关的内容。
01 代码文件
核心记忆系统 (deerflow/agents/memory/),各个文件的实现的功能:
__init__.py:模块入口updater.py:LLM记忆更新(提取事实、去重、原子写入)queue.py:记忆更新队列(防抖、线程级去重)prompt.py:记忆更新的 Prompt 模板storage.py:文件存储(按用户隔离)message_processing.py:消息预处理/过滤summarization_hook.py:摘要Hook
记忆中间件:
agents/middlewares/memory_middleware.py:拦截用户消息和 AI 回复,触发队列更新
记忆配置:
config/memory_config.py:memory 配置项的 Pydantic 模型
02 用于记忆生成的文本内容
代码位置:deerflow\agents\memory\message_processing.py
函数:filter_messages_for_memory
对话结束后,filter_messages_for_memory() 从全部消息中:
保留:用户输入(human) + AI 最终回复(ai,无 tool_calls)
剔除:中间工具调用(tool)、工具结果
剥离:<uploaded_files>...</uploaded_files> 内容(上传文件路径不写入记忆)
例子:
输入: [Human("我想用Python"), ToolCall(...), ToolResult(...), AIMessage("好的")]
输出: [Human("我想用Python"), AIMessage("好的")]
补充:
LangGraph的消息有几种类型:
human:用户消息
ai:AI回复(可能带tool_calls)
tool:工具执行结果
AIMessage 可能有 tool_calls 字段表示调用了工具
下面是具体的代码实现,我加上了一些注释:
def filter_messages_for_memory(messages: list[Any]) -> list[Any]:"""Keep only user inputs and final assistant responses for memory updates."""filtered = [] # 最终保留的消息列表skip_next_ai = False # 标记:是否跳过下一个AI消息,处理的情况:用户只上传了文件(没有任何文字),此时紧跟的 AI 回复是"收到文件",这条回复不应该被记入记忆。for msg in messages:msg_type = getattr(msg, "type", None) # 获取消息类型if msg_type == "human": # 用户消息content_str = extract_message_text(msg) # 提取消息文本if "<uploaded_files>" in content_str:# 用户消息中是否包含上传文件标记stripped = _UPLOAD_BLOCK_RE.sub("", content_str).strip() # 去掉上传标签if not stripped: # 去掉上传标签之后内容为空,此时 AI回复的内容则跳过skip_next_ai = Truecontinue# 如果用户消息有上传文件,但也有文字,则去掉上传标签,剩下的内容需要进行记忆的生成clean_msg = copy(msg) # 浅拷贝,不修改原消息对象clean_msg.content = strippedfiltered.append(clean_msg)skip_next_ai = Falseelse:filtered.append(msg)skip_next_ai = Falseelif msg_type == "ai": # AI回复tool_calls = getattr(msg, "tool_calls", None) # 获取工具调用列表if not tool_calls: # tool_calls 为空/None/[] 时为 False,进入 ifif skip_next_ai:skip_next_ai = Falsecontinuefiltered.append(msg)return filtered
下面是一些例子:
示例1:正常对话输入: [Human("我想学Python"), AIMessage("很好")]输出: [Human("我想学Python"), AIMessage("很好")]示例2:用户只上传了文件输入: [Human("<uploaded_files>xxx.pdf</uploaded_files>"), AIMessage("收到文件")]处理:- Human: 去掉标签后内容为空 → skip_next_ai=True → 不加入- AIMessage: 没有 tool_calls,但 skip_next_ai=True → 跳过,不加入输出: []示例3:用户上传文件 + 文字输入: [Human("<uploaded_files>x.pdf</uploaded_files> 我想问..."), AIMessage("好的")]处理:- Human: 去掉标签剩"我想问..." → clean_msg → 加入; skip_next_ai=False- AIMessage: 没有 tool_calls → 直接加入输出: [Human("我想问..."), AIMessage("好的")]示例4:工具调用后的 AI 回复输入: [Human("查下天气"), AIMessage("好的", tool_calls=[...]), ToolResult("晴天"), AIMessage("今天是晴天")]处理:- Human → 加入- AIMessage(tool_calls) → 有 tool_calls → 不加入- ToolResult → 不是 human/ai → 不进入分支 → 不加入- AIMessage(无tool_calls) → 加入输出: [Human("查下天气"), AIMessage("今天是晴天")]
03 构建prompt
代码位置:deerflow\agents\memory\updater.py
函数:_prepare_update_prompt
def _prepare_update_prompt(self, messages, agent_name, correction_detected, reinforcement_detected, user_id=None):# 1. 加载当前 memorycurrent_memory = get_memory_data(agent_name, user_id=user_id)# 2. 把对话列表格式化成文本conversation_text = format_conversation_for_update(messages)# 3. 构建纠错/强化提示correction_hint = self._build_correction_hint(correction_detected, reinforcement_detected)# 4. 用模板拼出完整 promptprompt = MEMORY_UPDATE_PROMPT.format(current_memory=json.dumps(current_memory, indent=2), # 当前记忆状态conversation=conversation_text, # 新对话内容correction_hint=correction_hint, # 可选的纠错提示)return current_memory, prompt
提示词模板:
MEMORY_UPDATE_PROMPT = """You are a memory management system. Your task is to analyze a conversation and update the user's memory profile.Current Memory State:<current_memory>{current_memory}</current_memory>New Conversation to Process:<conversation>{conversation}</conversation>Instructions:1. Analyze the conversation for important information about the user2. Extract relevant facts, preferences, and context with specific details (numbers, names, technologies)3. Update the memory sections as needed following the detailed length guidelines belowBefore extracting facts, perform a structured reflection on the conversation:1. Error/Retry Detection: Did the agent encounter errors, require retries, or produce incorrect results?If yes, record the root cause and correct approach as a high-confidence fact with category "correction".2. User Correction Detection: Did the user correct the agent's direction, understanding, or output?If yes, record the correct interpretation or approach as a high-confidence fact with category "correction".Include what went wrong in "sourceError" only when category is "correction" and the mistake is explicit in the conversation.3. Project Constraint Discovery: Were any project-specific constraints discovered during the conversation?If yes, record them as facts with the most appropriate category and confidence.{correction_hint}Memory Section Guidelines:**User Context** (Current state - concise summaries):- workContext: Professional role, company, key projects, main technologies (2-3 sentences)Example: Core contributor, project names with metrics (16k+ stars), technical stack- personalContext: Languages, communication preferences, key interests (1-2 sentences)Example: Bilingual capabilities, specific interest areas, expertise domains- topOfMind: Multiple ongoing focus areas and priorities (3-5 sentences, detailed paragraph)Example: Primary project work, parallel technical investigations, ongoing learning/trackingInclude: Active implementation work, troubleshooting issues, market/research interestsNote: This captures SEVERAL concurrent focus areas, not just one task**History** (Temporal context - rich paragraphs):- recentMonths: Detailed summary of recent activities (4-6 sentences or 1-2 paragraphs)Timeline: Last 1-3 months of interactionsInclude: Technologies explored, projects worked on, problems solved, interests demonstrated- earlierContext: Important historical patterns (3-5 sentences or 1 paragraph)Timeline: 3-12 months agoInclude: Past projects, learning journeys, established patterns- longTermBackground: Persistent background and foundational context (2-4 sentences)Timeline: Overall/foundational informationInclude: Core expertise, longstanding interests, fundamental working style**Facts Extraction**:- Extract specific, quantifiable details (e.g., "16k+ GitHub stars", "200+ datasets")- Include proper nouns (company names, project names, technology names)- Preserve technical terminology and version numbers- Categories:* preference: Tools, styles, approaches user prefers/dislikes* knowledge: Specific expertise, technologies mastered, domain knowledge* context: Background facts (job title, projects, locations, languages)* behavior: Working patterns, communication habits, problem-solving approaches* goal: Stated objectives, learning targets, project ambitions* correction: Explicit agent mistakes or user corrections, including the correct approach- Confidence levels:* 0.9-1.0: Explicitly stated facts ("I work on X", "My role is Y")* 0.7-0.8: Strongly implied from actions/discussions* 0.5-0.6: Inferred patterns (use sparingly, only for clear patterns)**What Goes Where**:- workContext: Current job, active projects, primary tech stack- personalContext: Languages, personality, interests outside direct work tasks- topOfMind: Multiple ongoing priorities and focus areas user cares about recently (gets updated most frequently)Should capture 3-5 concurrent themes: main work, side explorations, learning/tracking interests- recentMonths: Detailed account of recent technical explorations and work- earlierContext: Patterns from slightly older interactions still relevant- longTermBackground: Unchanging foundational facts about the user**Multilingual Content**:- Preserve original language for proper nouns and company names- Keep technical terms in their original form (DeepSeek, LangGraph, etc.)- Note language capabilities in personalContextOutput Format (JSON):{{"user": {{"workContext": {{ "summary": "...", "shouldUpdate": true/false }},"personalContext": {{ "summary": "...", "shouldUpdate": true/false }},"topOfMind": {{ "summary": "...", "shouldUpdate": true/false }}}},"history": {{"recentMonths": {{ "summary": "...", "shouldUpdate": true/false }},"earlierContext": {{ "summary": "...", "shouldUpdate": true/false }},"longTermBackground": {{ "summary": "...", "shouldUpdate": true/false }}}},"newFacts": [{{ "content": "...", "category": "preference|knowledge|context|behavior|goal|correction", "confidence": 0.0-1.0 }}],"factsToRemove": ["fact_id_1", "fact_id_2"]}}Important Rules:- Only set shouldUpdate=true if there's meaningful new information- Follow length guidelines: workContext/personalContext are concise (1-3 sentences), topOfMind and history sections are detailed (paragraphs)- Include specific metrics, version numbers, and proper nouns in facts- Only add facts that are clearly stated (0.9+) or strongly implied (0.7+)- Use category "correction" for explicit agent mistakes or user corrections; assign confidence >= 0.95 when the correction is explicit- Include "sourceError" only for explicit correction facts when the prior mistake or wrong approach is clearly stated; omit it otherwise- Remove facts that are contradicted by new information- When updating topOfMind, integrate new focus areas while removing completed/abandoned onesKeep 3-5 concurrent focus themes that are still active and relevant- For history sections, integrate new information chronologically into appropriate time period- Preserve technical accuracy - keep exact names of technologies, companies, projects- Focus on information useful for future interactions and personalization- IMPORTANT: Do NOT record file upload events in memory. Uploaded files aresession-specific and ephemeral — they will not be accessible in future sessions.Recording upload events causes confusion in subsequent conversations.Return ONLY valid JSON, no explanation or markdown."""
对应的中文翻译版本:
你是记忆管理系统,你的任务是分析对话并更新用户的记忆档案当前记忆状态:<current_memory>{current_memory}</current_memory>待处理的新对话:<conversation>{conversation}</conversation>说明:1. 分析对话,提取关于用户的重要信息2. 提取相关的事实、偏好和上下文,包括具体细节(数字、名称、技术)3. 根据以下详细的篇幅指南更新记忆各部分在提取事实之前,请对对话进行结构化反思:1. 错误/重试检测:代理是否遇到错误、需要重试或产生了错误结果?如果是,将根本原因和正确方法记录为高置信度事实,类别为"纠正(correction)"。2. 用户纠正检测:用户是否纠正了代理的方向、理解或输出?如果是,将正确的理解或方法记录为高置信度事实,类别为"纠正(correction)"。仅当类别为"纠正"且错误明确时,在"sourceError"中记录出了什么问题。3. 项目约束发现:是否在对话中发现了任何项目特定的约束?如果是,将其作为事实记录在最合适的类别下,并附上适当的置信度。{correction_hint}记忆部分指南:**用户上下文**(当前状态 - 简洁摘要):- workContext:专业角色、公司、主要项目、主要技术栈(2-3句话)示例:核心贡献者,项目名称及指标(16k+ stars),技术栈- personalContext:语言、沟通偏好、关键兴趣(1-2句话)示例:双语能力、特定兴趣领域、专业知识- topOfMind:多个正在进行的重点和优先级(3-5句话,详细段落)示例:主要项目工作、并行技术调查、正在学习/跟踪的内容包括:活跃的实现工作、故障排除问题、市场/研究兴趣注意:这应捕捉多个并发的主题,而不仅仅是一个任务**历史记录**(时间上下文 - 详细段落):- recentMonths:近期活动的详细摘要(4-6句话或1-2段)时间线:最近1-3个月的互动包括:探索的技术、从事的项目、解决的问题、展示的兴趣- earlierContext:重要的历史模式(3-5句话或1段)时间线:3-12个月前包括:过去项目、学习历程、已建立的模式- longTermBackground:持久的背景和基础上下文(2-4句话)时间线:整体/基础信息包括:核心专业知识、长期兴趣、基本工作风格**事实提取**:- 提取具体可量化的细节(例如:"16k+ GitHub stars"、"200+ 数据集")- 包含专有名词(公司名称、项目名称、技术名称)- 保留技术术语和版本号- 类别:* preference(偏好):用户喜欢/不喜欢的工具、风格、方法* knowledge(知识):特定专业知识、掌握的技术、领域知识* context(上下文):背景事实(职位、项目、地点、语言)* behavior(行为):工作模式、沟通习惯、解决问题的方法* goal(目标):既定目标、学习目标、项目志向* correction(纠正):明确的代理错误或用户纠正,包括正确方法- 置信度级别:* 0.9-1.0:明确陈述的事实("我从事X工作"、"我的角色是Y")* 0.7-0.8:强烈暗示的(从行动/讨论中推断)* 0.5-0.6:推断的模式(谨慎使用,仅用于明确的模式)**内容放置位置**:- workContext:当前工作、活跃项目、主要技术栈- personalContext:语言、个性、直接工作之外的兴趣- topOfMind:用户最近关心的多个正在进行的优先级和焦点(更新最频繁)应捕捉3-5个并发主题:主要工作、附带探索、学习/跟踪兴趣- recentMonths:近期技术探索和工作的详细记录- earlierContext:仍相关联的稍旧交互中的模式- longTermBackground:不变的基础事实关于用户**多语言内容**:- 保留专有名词和公司名称的原始语言- 技术术语保持原样(DeepSeek、LangGraph 等)- 在 personalContext 中注明语言能力输出格式(JSON):{{"user": {{"workContext": {{ "summary": "...", "shouldUpdate": true/false }},"personalContext": {{ "summary": "...", "shouldUpdate": true/false }},"topOfMind": {{ "summary": "...", "shouldUpdate": true/false }}}},"history": {{"recentMonths": {{ "summary": "...", "shouldUpdate": true/false }},"earlierContext": {{ "summary": "...", "shouldUpdate": true/false }},"longTermBackground": {{ "summary": "...", "shouldUpdate": true/false }}}},"newFacts": [{{ "content": "...", "category": "preference|knowledge|context|behavior|goal|correction", "confidence": 0.0-1.0 }}],"factsToRemove": ["fact_id_1", "fact_id_2"]}}重要规则:- 仅在有有意义的新信息时设置 shouldUpdate=true- 遵循篇幅指南:workContext/personalContext 应简洁(1-3句话),topOfMind 和 history 部分应详细(段落)- 在事实中包含具体的指标、版本号和专有名词- 仅添加明确陈述(0.9+)或强烈暗示(0.7+)的事实- 对于明确的代理错误或用户纠正,使用类别"correction";当纠正明确时,置信度赋值 >= 0.95- 仅当类别为"correction"且prior错误或错误方法明确陈述时,才包含"sourceError";否则省略- 移除被新信息矛盾的事实- 更新 topOfMind 时,在移除已完成/已放弃的主题的同时,整合新的焦点主题保持3-5个仍活跃且相关的并发焦点主题- 对于 history 部分,将新信息按时间顺序整合到适当的时间段- 保持技术准确性 - 保留技术、公司、项目的准确名称- 聚焦对未来互动和个人化有用的信息- 重要:不要在记忆中记录文件上传事件。上传的文件是会话特定的、临时的——在未来的会话中无法访问。记录上传事件会在后续对话中造成混淆。仅返回有效的 JSON,不做任何解释或添加 markdown 格式"""
构建纠错/强化提示函数实现_build_correction_hint:
def _build_correction_hint(self,correction_detected: bool,reinforcement_detected: bool,) -> str:"""Build optional prompt hints for correction and reinforcement signals."""correction_hint = ""if correction_detected:correction_hint = ("IMPORTANT: Explicit correction signals were detected in this conversation. ""Pay special attention to what the agent got wrong, what the user corrected, ""and record the correct approach as a fact with category "'"correction" and confidence >= 0.95 when appropriate.')if reinforcement_detected:reinforcement_hint = ("IMPORTANT: Positive reinforcement signals were detected in this conversation. ""The user explicitly confirmed the agent's approach was correct or helpful. ""Record the confirmed approach, style, or preference as a fact with category "'"preference" or "behavior" and confidence >= 0.9 when appropriate.')correction_hint = (correction_hint + "\n" + reinforcement_hint).strip() if correction_hint else reinforcement_hintreturn correction_hint# 改成中文的提示词def _build_correction_hint_zh(self,correction_detected: bool = False,reinforcement_detected: bool = False,) -> str:"""构建修正提示(中文版)"""correction_hint = ""if correction_detected:correction_hint = ("重要提示:对话中检测到明确的纠正信号。请特别注意代理出错的内容、用户纠正了什么,""并在适当情况下将正确的方法记录为类别为 correction 的事实,置信度 >= 0.95。")if reinforcement_detected:reinforcement_hint = ("重要提示:对话中检测到正向强化信号。用户明确确认了代理的方法是正确的或有帮助的,""请在适当情况下将确认的方法、风格或偏好记录为类别为 preference 或 behavior 的事实,置信度 >= 0.9。")correction_hint = (correction_hint + "\n" + reinforcement_hint).strip() if correction_hint else reinforcement_hintreturn correction_hint
_build_correction_hint作用:当用户在对话中纠正AI错误或确认AI做对了时,通知LLM重点记忆这些信息。
两个检测信号是布尔值来自于 message_processing.py 中的正则匹配:
_CORRECTION_PATTERNS = (re.compile(r"\bthat(?:'s| is) (?:wrong|incorrect)\b", re.IGNORECASE),re.compile(r"\byou misunderstood\b", re.IGNORECASE),re.compile(r"\btry again\b", re.IGNORECASE),re.compile(r"\bredo\b", re.IGNORECASE),re.compile(r"不对"),re.compile(r"你理解错了"),re.compile(r"你理解有误"),re.compile(r"重试"),re.compile(r"重新来"),re.compile(r"换一种"),re.compile(r"改用"),)_REINFORCEMENT_PATTERNS = (re.compile(r"\byes[,.]?\s+(?:exactly|perfect|that(?:'s| is) (?:right|correct|it))\b", re.IGNORECASE),re.compile(r"\bperfect(?:[.!?]|$)", re.IGNORECASE),re.compile(r"\bexactly\s+(?:right|correct)\b", re.IGNORECASE),re.compile(r"\bthat(?:'s| is)\s+(?:exactly\s+)?(?:right|correct|what i (?:wanted|needed|meant))\b", re.IGNORECASE),re.compile(r"\bkeep\s+(?:doing\s+)?that\b", re.IGNORECASE),re.compile(r"\bjust\s+(?:like\s+)?(?:that|this)\b", re.IGNORECASE),re.compile(r"\bthis is (?:great|helpful)\b(?:[.!?]|$)", re.IGNORECASE),re.compile(r"\bthis is what i wanted\b(?:[.!?]|$)", re.IGNORECASE),re.compile(r"对[,,]?\s*就是这样(?:[。!?!?.]|$)"),re.compile(r"完全正确(?:[。!?!?.]|$)"),re.compile(r"(?:对[,,]?\s*)?就是这个意思(?:[。!?!?.]|$)"),re.compile(r"正是我想要的(?:[。!?!?.]|$)"),re.compile(r"继续保持(?:[。!?!?.]|$)"),)def detect_correction(messages: list[Any]) -> bool:"""Detect explicit user corrections in recent conversation turns."""recent_user_msgs = [msg for msg in messages[-6:] if getattr(msg, "type", None) == "human"]for msg in recent_user_msgs:content = extract_message_text(msg).strip()if content and any(pattern.search(content) for pattern in _CORRECTION_PATTERNS):return Truereturn Falsedef detect_reinforcement(messages: list[Any]) -> bool:"""Detect explicit positive reinforcement signals in recent conversation turns."""recent_user_msgs = [msg for msg in messages[-6:] if getattr(msg, "type", None) == "human"]for msg in recent_user_msgs:content = extract_message_text(msg).strip()if content and any(pattern.search(content) for pattern in _REINFORCEMENT_PATTERNS):return Truereturn False
correction_detected:用户说"不对"、"你理解错了"、"try again" 等, 用户在纠正 AI 的错误
reinforcement_detected:用户说"perfect"、"正是我想要的"、"exactly right" 等, 用户在确认 AI 做对了
为什么要单独给提示?普通的对话更新 prompt 没有这部分内容。加入这个 hint 后,LLM 会格外注意记录两类高优先级信息:---correction_detected 时(纠正)拼入 Prompt 的内容:IMPORTANT: Explicit correction signals were detected in this conversation.Pay special attention to what the agent got wrong, what the user corrected,and record the correct approach as a fact with category "correction" and confidence >= 0.95 when appropriate.效果: LLM 会在返回的 JSON 中加入 category: "correction" 的 fact,并标记 confidence >= 0.95。例子:User: 不对,我说的是用 Go 不是 PythonAssistant: 抱歉,我理解错了→ memory.json 写入:{ "content": "用户想用 Go 而非 Python", "category": "correction", "confidence": 0.95, "sourceError": "AI误以为用户要用Python" }这样下次 AI 就不会重蹈覆辙。---reinforcement_detected 时(正向确认)拼入 Prompt 的内容:IMPORTANT: Positive reinforcement signals were detected in this conversation.The user explicitly confirmed the agent's approach was correct or helpful.Record the confirmed approach, style, or preference as a fact with category "preference" or "behavior" and confidence >= 0.9 when appropriate.效果: LLM 会将用户满意的操作记为 category: "preference" 或 "behavior",confidence 给到 0.9+。例子:User: perfect,这正是我想要的Assistant: 很高兴帮到你→ memory.json 写入:{ "content": "用户对简洁直接的代码示例满意", "category": "preference", "confidence": 0.9 }---两者都检测到时两个 hint 会合并成一段话同时生效,LLM 优先记录纠正信息(confidence 更高)。---
04 基于LLM生成记忆
代码位置:deerflow\agents\memory\updater.py
函数:_do_update_memory_sync
model = self._get_model()response = model.invoke(prompt, config={"run_name": "memory_agent"})
05 记忆合并保存
代码位置:deerflow\agents\memory\updater.py
函数:_finalize_update
def _finalize_update(self,current_memory: dict[str, Any],response_content: Any,thread_id: str | None,agent_name: str | None,user_id: str | None = None,) -> bool:"""Parse the model response, apply updates, and persist memory."""# 1. 解析 LLM 返回的 JSONupdate_data = _parse_memory_update_response(response_content)# Deep-copy before in-place mutation so a subsequent save() failure# cannot corrupt the still-cached original object reference.updated_memory = self._apply_updates(copy.deepcopy(current_memory), update_data, thread_id)updated_memory = _strip_upload_mentions_from_memory(updated_memory)return get_memory_storage().save(updated_memory, agent_name, user_id=user_id)'''# 1. 解析 LLM 返回的 JSONupdate_data = _parse_memory_update_response(response_content)# 2. 合并到现有 memoryupdated_memory = self._apply_updates(current_memory, update_data, thread_id)# 3. 写入文件get_memory_storage().save(updated_memory, agent_name, user_id=user_id)'''
_finalize_update这个函数做三件事:解析 --> 合并 --> 持久化
_parse_memory_update_response(response_content)
函数的作用:从LLM返回的响应内容中,找到并解析出第一个合法的记忆更新JSON对象
函数具体实现代码如下所示:
def _parse_memory_update_response(response_content: Any) -> dict[str, Any]:"""Parse the first valid memory-update JSON object from an LLM response.Some providers may wrap JSON in thinking traces, prose, or markdown fenceseven when prompted to return JSON only. This parser accepts safelyextractable JSON objects but does not repair truncated or malformed JSON."""response_text = _extract_text(response_content).strip()decoder = json.JSONDecoder()for match in re.finditer(r"\{", response_text):try:parsed, _end = decoder.raw_decode(response_text[match.start() :])except json.JSONDecodeError:continueif isinstance(parsed, dict) and _REQUIRED_MEMORY_UPDATE_TOP_LEVEL_KEYS.issubset(parsed):return _normalize_memory_update_data(parsed)raise json.JSONDecodeError("No valid memory update JSON object found", response_text, 0)
总结:
这个函数的设计哲学:防御性解析--LLM的输出是不可控的,可能包含thinking痕迹、markdown包装、前缀说明文字等。这个函数通过"找到所有 { 位置,逐个尝试解析,找到第一个包含必需 key 的字典"的方式,在混乱的 LLM 输出中鲁棒地提取出目标JSON,而不会因为前导垃圾文本直接失败。
_apply_updates
函数的作用:将LLM生成的记忆更新数据(update_data)合并到当前的记忆结构(current_memory)中,返回更新后的完整记忆数据。
函数具体实现代码如下所示:
def _apply_updates(self,current_memory: dict[str, Any], # 从存储加载的当前记忆数据update_data: dict[str, Any], # LLM 解析出的更新指令(包含 user、history、newFacts、factsToRemove)thread_id: str | None = None, # 线程 ID,用于标记新 fact 的来源) -> dict[str, Any]:"""Apply LLM-generated updates to memory.Args:current_memory: Current memory data.update_data: Updates from LLM.thread_id: Optional thread ID for tracking.Returns:Updated memory data."""# 获取记忆配置和当前 UTC 时间(ISO 8601 格式带 Z 后缀),用于给更新的字段打上时间戳。config = get_memory_config()now = utc_now_iso_z()# 1、Update user sections'''更新user下的三个字段:- 遍历 workContext、personalContext、topOfMind- 只有当 shouldUpdate == true 且 summary 存在时,才真正更新- 更新内容:覆盖 summary 并记录 updatedAt 时间戳'''user_updates = update_data.get("user", {})for section in ["workContext", "personalContext", "topOfMind"]:section_data = user_updates.get(section, {})if section_data.get("shouldUpdate") and section_data.get("summary"):current_memory["user"][section] = {"summary": section_data["summary"],"updatedAt": now,}# 2、Update history sections'''更新 history 下的三个字段(逻辑与 user 部分相同)- 遍历 recentMonths、earlierContext、longTermBackground- 只有 shouldUpdate 和 summary 都存在时才更新'''history_updates = update_data.get("history", {})for section in ["recentMonths", "earlierContext", "longTermBackground"]:section_data = history_updates.get(section, {})if section_data.get("shouldUpdate") and section_data.get("summary"):current_memory["history"][section] = {"summary": section_data["summary"],"updatedAt": now,}# 3、Remove facts'''删除被标记要移除的 fact- 将 factsToRemove 列表转换为集合(O(1) 查找)- 列表推导式过滤掉 id 在待删除集合中的 fact'''facts_to_remove = set(update_data.get("factsToRemove", []))if facts_to_remove:current_memory["facts"] = [f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove]# 4、Add new facts'''构建已有 fact 的内容去重集合:- _fact_content_key将 fact 内容 strip 后转小写,返回去重用的 key- 用集合推导式提取当前记忆中所有 fact 的去重 key- 用于后续检测新 fact 是否与已有 fact 内容重复遍历 LLM 生成的新 fact 列表,逐条处理:- 只有 confidence >= fact_confidence_threshold(默认 0.7)的 fact 才会被采纳- 低于阈值的 fact 被忽略去重检查:- 内容必须是字符串,非字符串跳过- 去除首尾空白后,计算去重 key- 如果该 key 已在 existing_fact_keys 中,说明是重复 fact,跳过(不添加)构造新的 fact 条目:- 生成 8 位十六进制 UUID 作为唯一 ID- 使用标准化后的内容、LLM 提供的 category(默认 "context")、置信度- 记录创建时间和来源(thread_id 或 "unknown")可选字段 sourceError 处理:- 如果 LLM 返回了 sourceError(记录错误来源),且非空,就附加到 fact 条目中- 这是在记忆 prompt 中要求 LLM 记录"纠正类 fact"的错误来源时使用的'''existing_fact_keys = {fact_key for fact_key in (_fact_content_key(fact.get("content")) for fact in current_memory.get("facts", [])) if fact_key is not None}new_facts = update_data.get("newFacts", [])for fact in new_facts:confidence = fact.get("confidence", 0.5)if confidence >= config.fact_confidence_threshold:raw_content = fact.get("content", "")if not isinstance(raw_content, str):continuenormalized_content = raw_content.strip()fact_key = _fact_content_key(normalized_content)if fact_key is not None and fact_key in existing_fact_keys:continuefact_entry = {"id": f"fact_{uuid.uuid4().hex[:8]}","content": normalized_content,"category": fact.get("category", "context"),"confidence": confidence,"createdAt": now,"source": thread_id or "unknown",}source_error = fact.get("sourceError")if isinstance(source_error, str):normalized_source_error = source_error.strip()if normalized_source_error:fact_entry["sourceError"] = normalized_source_errorcurrent_memory["facts"].append(fact_entry)if fact_key is not None:existing_fact_keys.add(fact_key) # 将新 fact 追加到记忆列表,并更新去重集合(防止同一轮添加多个重复的 fact)# 5、Enforce max facts limit'''强制执行 fact 数量上限:- 如果 fact 总数超过 max_facts(默认 100),按 confidence 降序排序- 只保留前 max_facts 条(置信度最高的)'''if len(current_memory["facts"]) > config.max_facts:# Sort by confidence and keep top onescurrent_memory["facts"] = sorted(current_memory["facts"],key=lambda f: f.get("confidence", 0),reverse=True,)[: config.max_facts]return current_memory
_strip_upload_mentions_from_memory
函数的作用:从记忆的所有summary和fact中删除关于文件上传事件的句子。这是因为上传的文件是“会话级别”的临时资源,如果把“用户上传了某个文件”这件事记录到长期记忆中,未来的对话中agent可能会尝试去搜索一个早已不存在的文件路径。
函数具体实现代码如下所示:
_UPLOAD_SENTENCE_RE = re.compile(r"[^.!?]*\b(?:"r"upload(?:ed|ing)?(?:\s+\w+){0,3}\s+(?:file|files?|document|documents?|attachment|attachments?)"r"|file\s+upload"r"|/mnt/user-data/uploads/"r"|<uploaded_files>"r")[^.!?]*[.!?]?\s*",re.IGNORECASE,)def _strip_upload_mentions_from_memory(memory_data: dict[str, Any]) -> dict[str, Any]:"""Remove sentences about file uploads from all memory summaries and facts.Uploaded files are session-scoped; persisting upload events in long-termmemory causes the agent to search for non-existent files in future sessions."""# Scrub summaries in user/history sectionsfor section in ("user", "history"):section_data = memory_data.get(section, {})for _key, val in section_data.items():if isinstance(val, dict) and "summary" in val:cleaned = _UPLOAD_SENTENCE_RE.sub("", val["summary"]).strip()cleaned = re.sub(r" +", " ", cleaned)val["summary"] = cleaned# Also remove any facts that describe upload eventsfacts = memory_data.get("facts", [])if facts:memory_data["facts"] = [f for f in facts if not _UPLOAD_SENTENCE_RE.search(f.get("content", ""))]return memory_data
get_memory_storage().save(updated_memory, agent_name, user_id=user_id)
函数作用:将记忆数据原子写入磁盘文件,并同步更新内存换粗。它采用了写前复制+临时文件+rename的模式来实现原子持久化,避免写入失败时污染原有的记忆文件
save函数具体的代码如下所示:
def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:"""Save memory data to file and update cache."""file_path = self._get_memory_file_path(agent_name, user_id=user_id)cache_key = self._cache_key(agent_name, user_id=user_id)try:file_path.parent.mkdir(parents=True, exist_ok=True)# Shallow-copy before adding lastUpdated so the caller's dict is not# mutated as a side-effect, and the cache reference is not silently# updated before the file write succeeds.memory_data = {**memory_data, "lastUpdated": utc_now_iso_z()}temp_path = file_path.with_suffix(f".{uuid.uuid4().hex}.tmp")with open(temp_path, "w", encoding="utf-8") as f:json.dump(memory_data, f, indent=2, ensure_ascii=False)temp_path.replace(file_path)try:mtime = file_path.stat().st_mtimeexcept OSError:mtime = Nonewith self._cache_lock:self._memory_cache[cache_key] = (memory_data, mtime)logger.info("Memory saved to %s", file_path)return Trueexcept OSError as e:logger.error("Failed to save memory file: %s", e)return False
06 示例
输入:
def save(self, memory_data: dict[str, Any], agent_name: str | None = None, *, user_id: str | None = None) -> bool:"""Save memory data to file and update cache."""file_path = self._get_memory_file_path(agent_name, user_id=user_id)cache_key = self._cache_key(agent_name, user_id=user_id)try:file_path.parent.mkdir(parents=True, exist_ok=True)# Shallow-copy before adding lastUpdated so the caller's dict is not# mutated as a side-effect, and the cache reference is not silently# updated before the file write succeeds.memory_data = {**memory_data, "lastUpdated": utc_now_iso_z()}temp_path = file_path.with_suffix(f".{uuid.uuid4().hex}.tmp")with open(temp_path, "w", encoding="utf-8") as f:json.dump(memory_data, f, indent=2, ensure_ascii=False)temp_path.replace(file_path)try:mtime = file_path.stat().st_mtimeexcept OSError:mtime = Nonewith self._cache_lock:self._memory_cache[cache_key] = (memory_data, mtime)logger.info("Memory saved to %s", file_path)return Trueexcept OSError as e:logger.error("Failed to save memory file: %s", e)return False
萃取的记忆:
{"version": "1.0","lastUpdated": "2026-06-11 16:47:59","user": {"workContext": {"summary": "张三是一名Python后端开发工程师,在阿里巴巴工作。目前正在开发一个主智能体项目,使用FastAPI框架,主要技术栈包括Python、LangChain和向量数据库。","updatedAt": "2026-06-11 16:47:59"},"personalContext": {"summary": "中文母语者,偏好使用结构化输出的方式处理技术问题。","updatedAt": "2026-06-11 16:47:59"},"topOfMind": {"summary": "目前专注于主智能体项目的开发,该项目使用FastAPI框架构建,技术栈为Python、LangChain和向量数据库。智能体需要具备自动分解复杂任务并调用各种工具执行的能力。用户对结构化输出方式有明确偏好,但需要确保技术方案的正确理解。","updatedAt": "2026-06-11 16:47:59"}},"history": {"recentMonths": {"summary": "近期正在积极开发主智能体项目,探索LangChain框架在复杂任务分解和工具调用方面的应用。技术选择上明确使用FastAPI作为Web框架,Python作为主要编程语言,并集成向量数据库支持。","updatedAt": "2026-06-11 16:47:59"},"earlierContext": {"summary": "","updatedAt": ""},"longTermBackground": {"summary": "具备Python后端开发的专业背景,在阿里巴巴担任开发工程师职位。","updatedAt": "2026-06-11 16:47:59"}},"facts": [{"id": "fact_a4cb8a0a","content": "用户姓名为张三","category": "context","confidence": 0.95,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction"},{"id": "fact_2290e804","content": "用户职业为Python后端开发工程师","category": "context","confidence": 0.95,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction"},{"id": "fact_b20506fe","content": "用户在阿里巴巴工作","category": "context","confidence": 0.95,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction"},{"id": "fact_3922954b","content": "用户正在开发主智能体项目,使用FastAPI框架","category": "context","confidence": 0.95,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction"},{"id": "fact_5850ea10","content": "用户技术栈包括Python、LangChain和向量数据库","category": "knowledge","confidence": 0.95,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction"},{"id": "fact_09afbe91","content": "用户偏好使用结构化输出的方式处理技术问题","category": "preference","confidence": 0.9,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction"},{"id": "fact_ff0c3979","content": "用户纠正了助理对LangChain ReAct模式和pydantic模型的技术理解","category": "correction","confidence": 0.95,"createdAt": "2026-06-11 16:47:59","source": "memory_extraction","sourceError": "助理错误地建议使用LangChain的ReAct模式配合工具调用和pydantic模型实现结构化输出"}]}
夜雨聆风