第 0 章 引子:为什么 LLM 需要一个"记忆层"?
大语言模型有两个先天限制:
无状态:每次 API 调用之间不共享任何信息,所有"上下文"必须显式写进 messages数组;上下文窗口有限:即便是 200K token 的模型,把所有历史塞进去也会导致延迟、成本、注意力稀释一起恶化。
最朴素的解法是"把对话写进向量库,查询时召回 Top-K",这就是早期的 RAG-style memory。但实践中很快暴露出几个新问题:
粒度太粗:一条用户消息可能同时包含"姓名"、"偏好"、"日程",向量召回时只能整段返回,无法精确利用; 结构化缺失:用户说过"我喜欢川菜"和"我讨厌香菜",应该作为可独立查询、可独立删除的两条事实,而不是绑在同一段文本里; 时间线丢失:对话是有顺序的,"昨天用户改主意了"这种信息必须保留顺序; 多租户隔离:一个 Agent 通常要同时服务多个用户/会话,记忆必须可按维度切割; 模型可替换:今天用 OpenAI,明天换 Bedrock,记忆不能跟着丢。
MemMachine 的设计哲学,就是模拟人类的认知分层来解决这些问题:
┌────────────────────────────────────────────────────────────┐
│ 人类大脑 │
├────────────────────────────────────────────────────────────┤
│ 工作记忆 : 正在想的事 (秒级) │
│ 情景记忆 : 发生过什么 (按时间,可回放) │
│ 语义记忆 : 我知道什么 (脱离时间的事实/常识) │
└────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ MemMachine │
├────────────────────────────────────────────────────────────┤
│ Working Memory : 当前会话内存(短期上下文) │
│ Episodic Memory : 对话/事件流(时间序列,可检索) │
│ Semantic Memory : 用户画像(LLM 抽取的结构化事实) │
└────────────────────────────────────────────────────────────┘
接下来的所有章节,都是围绕这三种记忆,以及围绕它们的存储、检索、生命周期管理展开的。
第 1 章 30 秒看懂 MemMachine
1.1 一张图看懂
flowchart TB
subgraph Client["客户端层"]
A1[Python SDK]
A2[TS SDK]
A3[REST]
A4[MCP]
end
subgraph API["API 层 FastAPI"]
B1[/projects/]
B2[/memories/]
B3[/metrics, /health/]
end
subgraph Core["核心服务层"]
C1[MemMachine 主类]
C2[EpisodicMemoryManager]
C3[SemanticSessionManager]
C4[RetrievalAgent]
C5[ResourceManager]
end
subgraph EpiMem["EpisodicMemory 单实例"]
D1[ShortTermMemory<br/>滚动摘要]
D2[LongTermMemory<br/>声明式记忆]
D3[EventMemory<br/>v0.3.x 新增]
end
subgraph Infra["存储与模型"]
E1[(EpisodeStore<br/>PostgreSQL/SQLite)]
E2[(VectorGraphStore<br/>Neo4j/NebulaGraph)]
E3[(VectorStore<br/>Qdrant/SQLite/sqlite-vec)]
E4[(SemanticStorage<br/>pgvector/Neo4j)]
E5[LLM / Embedder / Reranker]
end
Client --> API --> Core
C1 --> C2 --> EpiMem
C1 --> C3 --> E4
C1 --> C4
D1 -.摘要.-> E5
D2 --> E2
D3 --> E3
D3 --> E1
C1 --> E1
C5 --> E5
1.2 关键模块的源码位置
packages/server/src/memmachine_server/
├── main/memmachine.py # 总入口 MemMachine 主类
├── episodic_memory/
│ ├── episodic_memory.py # 单会话情景记忆 (STM+LTM 编排)
│ ├── episodic_memory_manager.py # 多会话/LRU 缓存
│ ├── short_term_memory/ # deque + 滚动摘要
│ ├── long_term_memory/ # 基于 VectorGraphStore 的 LTM
│ ├── declarative_memory/ # LTM 的底层:派生 + 嵌入
│ └── event_memory/ # v0.3.x 新一代 LTM
│ ├── event_memory.py
│ ├── segmenter/ # Event → Segment
│ ├── deriver/ # Segment → Derivative
│ └── segment_store/ # Segment 持久化
├── semantic_memory/
│ ├── semantic_memory.py # 后台抽取服务
│ ├── semantic_ingestion.py # 单 set 的处理循环
│ ├── semantic_llm.py # LLM 抽取/巩固调用
│ ├── cluster_manager.py # 相邻语义特征聚类
│ └── util/semantic_prompt_template.py # Profile 提示词
├── retrieval_agent/
│ ├── service_locator.py # 工厂函数
│ └── agents/
│ ├── tool_select_agent.py # LLM 路由
│ ├── coq_agent.py # Chain-of-Query 迭代式
│ ├── split_query_agent.py # 多实体并发查询
│ └── memmachine_retriever.py# 直接查 LTM
├── common/
│ ├── vector_store/ # 向量库统一抽象
│ ├── vector_graph_store/ # 图+向量统一抽象
│ ├── episode_store/ # 原始 Episode 持久化
│ ├── embedder/, reranker/, language_model/
│ ├── filter/ # FilterExpr DSL
│ └── metrics_factory/ # Prometheus
└── server/ # FastAPI + MCP HTTP/Stdio
后续所有章节都会回链到具体文件与行号。
第 2 章 核心数据模型
理解 MemMachine 之前,先认清下面几个数据类型。
2.1 Episode:一切的起点
所有写入 MemMachine 的最小单位都是 Episode。它定义在 packages/server/src/memmachine_server/common/episode_store/ 之下:
classEpisode:
uid: str # 唯一标识符
content: str # 消息内容
session_key: str # 所属会话,例如 "my_org/my_project"
producer_id: str # 内容产生者,例如 "alice"
producer_role: str # 角色:user / assistant / system
produced_for_id: str # 内容接收者,例如 "travel_agent"
episode_type: EpisodeType # MESSAGE / TEXT / ...
content_type: ContentType # MESSAGE / TEXT / ...
sequence_num: int # 会话内的递增序号
created_at: datetime
metadata: dict # 业务自定义元数据
filterable_metadata: dict # 用于检索过滤的元数据子集
Episode 与一个 OpenAI/Anthropic 消息几乎一一对应;但相比裸消息,它额外携带了 session_key、filterable_metadata 与 sequence_num,这些字段是后续多租户隔离、按维度过滤、跨会话排序的关键。
2.2 Segment / Derivative:事件记忆里的派生单元
EventMemory 引入了"事件 → 片段 → 派生"的三级抽象,源码:packages/server/src/memmachine_server/episodic_memory/event_memory/data_types.py:96-188。
classEvent:
uuid: UUID
timestamp: datetime
context: Context # ProducerContext | NullContext
blocks: list[Block] # 一条事件可包含多个 Block
classSegment:# Event 的最小可检索片段
uuid: UUID
event_uuid: UUID
index: int # 在 event.blocks 中的位置
offset: int # 在 block 内的切分偏移
timestamp: datetime
block: Block
classDerivative:# Segment 派生出的可向量化文本
uuid: UUID
segment_uuid: UUID
block: Block # 通常是 TextBlock
为什么要分这么细?
Block:一个 Block 代表一种内容类型(文本/JSON/HTML),不同 Block 类型对应不同的下游处理。当前主要实现是 TextBlock。Segment:把一条事件按"语义边界"切成小块,让向量召回更精准。文本场景下,由 TextSegmenter(packages/server/src/memmachine_server/episodic_memory/event_memory/segmenter/text_segmenter.py)使用 LangChain 的RecursiveCharacterTextSplitter在中文/英文标点上递归切分。Derivative:在 Segment 之上再做派生(如"格式化为 'producer: text'"、"按句子切分")。由 WholeTextDeriver/SentenceTextDeriver(packages/server/src/memmachine_server/episodic_memory/event_memory/deriver/text_deriver.py:47-80)产出。
真正写进向量库的是 Derivative,不是 Event 本身。一条 Event 可能展开为 5 个 Segment、10 个 Derivative,从而把召回粒度精细到句子级别。
2.3 SemanticFeature:语义记忆的最小单元
# packages/server/src/memmachine_server/semantic_memory/semantic_model.py:70-103
classSemanticFeature:
set_id: SetIdT # 所属"集合"(典型为 org/project 或 user)
category: str # 分类,如 "profile_prompt"
tag: str # 顶级标签,如 "Demographic Information"
feature_name: str # 二级特征名,如 "name"
value: str # 实际值,如 "张三"
metadata: Metadata # 包含 id、citations(关联 Episode UID)
注意它是 三元组 + 引用:(tag, feature_name, value) 描述事实,metadata.citations 反向链回原始 Episode,便于做"为什么记忆里有这条?"的可追溯审计。
2.4 三层命名空间
MemMachine 使用三层结构隔离记忆:
Organization (org_id)
└── Project (project_id)
└── Session (session_id)
└── 具体的 Episode / Feature
源码里它们被合并成一个字符串:
session_key = f"{org_id}/{project_id}"# 形式由配置决定
session_key 是所有底层存储的过滤主键。多租户场景下:
org_id用来隔离组织(一般对应公司/客户);project_id用来隔离产品/应用(不同 Agent 互不可见);metadata.user_id/metadata.agent_id/metadata.session_id在同一个(org, project)内做更细粒度切分。
提示:SDK 里
Project.memory(user_id=..., agent_id=..., session_id=...)调用,最终都会把这些字段写到 Episode 的metadata和filterable_metadata,从而保证后续检索可以按它们过滤。
第 3 章 写入路径:一条消息进来之后发生了什么
这是 MemMachine 工作流里最关键的一段代码。理解它,你就理解了 80% 的 MemMachine。
3.1 入口:MemMachine.add_episodes()
代码位置:packages/server/src/memmachine_server/main/memmachine.py:649-703。删掉日志、计数器之后的核心逻辑:
asyncdefadd_episodes(
self,
session_data,
episode_entries,
*,
target_memories=ALL_MEMORY_TYPES,
) -> list[EpisodeIdT]:
# 1) 原始 Episode 永久落盘(关系型/嵌入式存储)
episode_storage = await self._resources.get_episode_storage()
episodes = await episode_storage.add_episodes(
session_data.session_key, episode_entries,
)
tasks = []
# 2) 情景记忆(STM + LTM/EventMemory)
if MemoryType.Episodic in target_memories:
asyncwith episodic_memory_manager.open_or_create_episodic_memory(...) as ep:
tasks.append(ep.add_memory_episodes(episodes))
# 3) 语义记忆(异步排队,等待后台抽取)
if MemoryType.Semantic in target_memories:
tasks.append(
semantic_session_manager.add_message(
episodes=episodes, session_data=session_data,
)
)
await asyncio.gather(*tasks) # 步骤 2、3 并发执行
return [e.uid for e in episodes]
划重点:
始终先落 EpisodeStore:哪怕你显式禁用了 Episodic/Semantic,原始事件也已经持久化了,这是事后审计与回溯的基础; Episodic / Semantic 写入并发:通过 asyncio.gather,不会互相阻塞;Semantic 不是同步抽取: add_message只是把 Episode 加入"待抽取"队列,真正的 LLM 调用发生在SemanticService的后台任务里(见 3.5 节)。
3.2 Episodic 路径:EpisodicMemory.add_memory_episodes()
源码:packages/server/src/memmachine_server/episodic_memory/episodic_memory.py:208-242。
asyncdefadd_memory_episodes(self, episodes: list[Episode]) -> None:
# 把 metadata 中所有"标量"提取到 filterable_metadata,
# 这是后续按用户/会话/标签过滤的关键
for episode in episodes:
if episode.metadata isnotNoneand episode.filterable_metadata isNone:
episode.filterable_metadata = {
k: v for k, v in episode.metadata.items()
if isinstance(v, get_args(PropertyValue))
}
tasks: list[Coroutine] = []
if self._short_term_memory:
tasks.append(self._short_term_memory.add_episodes(episodes))
if self._long_term_memory:
tasks.append(self._long_term_memory.add_episodes(episodes))
await asyncio.gather(*tasks)
注意 STM 与 LTM 是并发写入 的。它们彼此独立、互不依赖;这意味着你可以单独开启 STM(最便宜,只有内存 + 偶尔的 LLM 摘要)或单独开启 LTM(重一些,需要向量库与图库),也可以同时启用。
3.3 ShortTermMemory:deque + 滚动摘要
源码:packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py。
它的逻辑非常直观——一个带容量的双端队列:
# packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py:187-209
asyncdefadd_episodes(self, episodes: list[Episode]) -> bool:
asyncwith self._lock.write_lock():
self._memory.extend(episodes)
self._current_episode_count += len(episodes)
self._current_message_len += sum(len(e.content) for e in episodes)
full = await self._is_full()
if full:
await self._do_evict()
return full
当总长度 = 已存 episode 字符数 + 摘要字符数 超过 message_capacity(默认 64000,单位为字符)时,触发 eviction:
# packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py:222-240
asyncdef_do_evict(self) -> None:
# 1) 先把已经被摘要过的旧 episode 直接淘汰
while len(self._memory) > self._current_episode_count andawait self._is_full():
self._current_message_len -= len(self._memory[0].content)
self._memory.popleft()
# 2) 还是满,就把所有 episode 异步压缩成一份新摘要
if len(self._memory) > 0andawait self._is_full():
result = list(self._memory)
self._current_episode_count = 0
await self._consolidator.summarize(result)
摘要的提示词模板由 packages/server/src/memmachine_server/common/configuration/default_episode_summary_system_prompt.txt 等文件提供,包含"保留实体/事件/未决问题"等关键约束。
为什么用滚动摘要而不是滑动窗口?
单纯丢弃旧 Episode 会导致长会话失忆; 全部塞入上下文会爆 token; 滚动摘要做"压缩 + 增量重写",让短期记忆始终保持一个有界但语义完整的视图。
3.4 LongTermMemory:声明式记忆 + 图存储
LTM 的核心抽象是 DeclarativeMemory:把 Episode 派生为可向量召回的 Derivative,再落到一个同时具备图能力与向量能力的存储(默认 Neo4j)。
源码:packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py:99-211。
asyncdefadd_episodes(self, episodes: Iterable[Episode]) -> None:
episodes = sorted(episodes, key=lambda e: (e.timestamp, e.uid))
# 1) Episode → Node:保留原始字段 + 业务可过滤字段
episode_nodes = [Node(uid=e.uid, properties={...}) for e in episodes]
# 2) 并发派生 Derivative(默认 1 episode → N derivative)
episodes_derivatives = await asyncio.gather(
*[self._derive_derivatives(e) for e in episodes]
)
derivatives = [d for lst in episodes_derivatives for d in lst]
# 3) 批量嵌入 Derivative 文本
derivative_embeddings = await self._embedder.ingest_embed(
[d.content for d in derivatives],
)
# 4) 写入两类节点
derivative_nodes = [
Node(uid=d.uid, properties={...},
embeddings={emb_name: (embedding, similarity_metric)})
for d, embedding in zip(derivatives, derivative_embeddings)
]
await asyncio.gather(
self._vector_graph_store.add_nodes(self._episode_collection, episode_nodes),
self._vector_graph_store.add_nodes(self._derivative_collection, derivative_nodes),
)
# 5) 建立 (Derivative)-[DERIVED_FROM]->(Episode) 关系
await self._vector_graph_store.add_edges(
relation=self._derived_from_relation,
source_collection=self._derivative_collection,
target_collection=self._episode_collection,
edges=derivative_episode_edges,
)
派生逻辑见 _derive_derivatives:
# packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py:213-272
match episode.content_type:
case ContentType.MESSAGE:
ifnot self._message_sentence_chunking:
return [Derivative(content=f"{episode.source}: {episode.content}", ...)]
sentences = extract_sentences(episode.content)
return [
Derivative(content=f"{episode.source}: {sentence}", ...)
for sentence in sentences
]
case ContentType.TEXT:
return [Derivative(content=episode.content, ...)]
两个值得关注的设计:
**每条 derivative 文本前会拼接 {producer}:**。这是给向量模型"加 context"的一种廉价手法,让 "alice: 我喜欢川菜" 与 "bob: 我讨厌川菜" 在嵌入空间天然分开。**可选 message_sentence_chunking**:开启后,按句子切分嵌入;适合"一条消息里夹杂多种事实"的场景。
最终的 Neo4j 图结构如下:
(:Episode {uid, content, producer_id, ...})
▲
│ [DERIVED_FROM]
│
(:Derivative {uid, content, embedding, ...})
向量索引建立在 Derivative 上;命中之后通过 DERIVED_FROM 反向找回 Episode。
3.5 EventMemory:v0.3.x 引入的新一代情景记忆
EventMemory 与 LongTermMemory 在功能上等价(都是"长期、可检索的情景记忆"),但实现思路不同。它的核心在于:
把"图能力"从存储底座中剥离(不再强依赖 Neo4j); 用通用的 VectorStore(Qdrant / SQLite-USearch / SQLite-vec)做向量召回;用独立的 SegmentStore维护片段邻接关系,实现"以 seed 为中心扩展前后文"。
源码:packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py:204-307。
asyncdef_encode_events(self, events: Iterable[Event]) -> None:
events = list(events); self._validate_events(events)
# 阶段 1:Segmenter 把 Event 切成 Segment
segment_lists = await asyncio.gather(
*(self._segmenter.segment(event) for event in events)
)
segments = [s for lst in segment_lists for s in lst]
# 阶段 2:Deriver 从 Segment 派生 Derivative
derivative_lists = await asyncio.gather(
*(self._deriver.derive(seg) for seg in segments)
)
segments_to_derivatives = dict(zip(segments, derivative_lists))
derivatives = [d for lst in derivative_lists for d in lst]
# 阶段 3:批量嵌入
derivative_texts = [EventMemory._extract_text(d.block) for d in derivatives]
derivative_embeddings = await self._embedder.ingest_embed(derivative_texts)
# 阶段 4:写 SegmentStore(保留 Segment 之间的邻接顺序)
await self._segment_store_partition.add_segments({
seg: [d.uuid for d in ds] for seg, ds in segments_to_derivatives.items()
})
# 阶段 5:写 VectorStore(按 Derivative 维度)
derivative_records = [EventMemory._build_derivative_record(d, e)
for d, e in zip(derivatives, derivative_embeddings)]
if derivative_records:
await self._vector_store_collection.upsert(records=derivative_records)
更进一步:
五个阶段都在 Prometheus 上以 histogram 形式上报(标签 phase=segmentation/derivation/embedding/segment_store/vector_store),便于运维定位性能瓶颈;_validate_events会拒绝带保留字段的事件(_segment_uuid、_timestamp是 EventMemory 占用的保留字段);写入 derivative 时同时写两类属性:系统属性( _segment_uuid、_timestamp)与用户自定义属性(直接落到向量库 record 的 properties)。
EventMemory vs LongTermMemory 怎么选?
想用 Neo4j 一体化做"图遍历 + 向量召回",选 LTM; 想跑在轻量级单机(SQLite/sqlite-vec),或想把向量库换成 Qdrant 集群、不引入图数据库,选 EventMemory; 两者可以并存,但同一会话通常二选一即可。
3.6 SemanticMemory:把对话蒸馏成"用户画像"
写入路径里最有意思的一段是语义记忆。它不是同步抽取,而是依赖一个后台轮询任务:
# packages/server/src/memmachine_server/semantic_memory/semantic_memory.py:783-836
asyncdef_background_ingestion_task(self) -> None:
ingestion_service = IngestionService(
params=IngestionService.Params(
semantic_storage=self._semantic_storage,
resource_retriever=self._set_id_resource,
history_store=self._episode_storage,
max_features_per_update=self._max_features_per_update,
),
)
backoff_sec = self._background_ingestion_interval_sec
whilenot self._is_shutting_down:
# 1) 找出"有未抽取消息 / 距上次抽取已经超时"的 set
dirty_sets = [
s asyncfor s in self._semantic_storage.get_history_set_ids(
min_uningested_messages=self._feature_update_message_limit,
older_than=datetime.now(tz=UTC) - self._feature_time_limit,
)
]
if len(dirty_sets) == 0:
await self._interruptible_sleep(self._background_ingestion_interval_sec)
continue
# 2) 批量处理,失败则指数退避
try:
await ingestion_service.process_set_ids(dirty_sets)
except Exception:
had_errors = True
...
# 3) 清理已被消化的历史行
purged = await self._semantic_storage.purge_ingested_rows(dirty_sets)
每个 set 的处理逻辑:
# packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py:122-278
asyncdef_process_single_set(self, set_id: str) -> None:
resources = await self._resource_retriever(set_id) # 拿 LLM / Embedder / Category
history_ids = [h asyncfor h in self._semantic_storage.get_history_messages(
set_ids=[set_id], limit=5, is_ingested=False,
)]
... # 加载原始 Episode
asyncdefprocess_semantic_type(semantic_category):
for message in messages:
features = [...] # 已有特征作为"旧 profile"喂给 LLM
commands = await llm_feature_update(
features=features,
message_content=message.content,
model=resources.language_model,
update_prompt=semantic_category.prompt.update_prompt,
)
await self._apply_commands(commands=commands, ...)
await asyncio.gather(*[
process_semantic_type(t) for t in resources.semantic_categories
])
await self._semantic_storage.mark_messages_ingested(...)
await self._consolidate_set_memories_if_applicable(set_id=set_id, resources=resources)
LLM 调用本身在 llm_feature_update 中(packages/server/src/memmachine_server/semantic_memory/semantic_llm.py:69-101),输出格式严格约束为:
{ "commands": [
{"command": "add", "tag": "Demographic Information", "feature": "name", "value": "张三"},
{"command": "delete", "tag": "Hobbies & Interests", "feature": "smoker"}
]}
为什么要走"命令"而不是"覆盖式更新"?
命令易于撤销与回放; 命令天然支持并发:多个 category 各自独立产出命令; delete命令使得"用户改主意"这种场景可以优雅落地:旧值删除 + 新值新增。
Profile 抽取提示词的完整骨架在 packages/server/src/memmachine_server/semantic_memory/util/semantic_prompt_template.py:6-153。它把语义记忆设计成两级 K-V:
tag (大类,如 "Demographic Information")
└─ feature (具体特征名,如 "name")
└─ value (值,如 "张三")
预置的 6 个 category(profile_prompt / coding_prompt / writing_assistant_prompt / financial_analyst_prompt / health_assistant_prompt / crm_prompt)都基于这套骨架定制(见 packages/server/src/memmachine_server/server/prompt/default_prompts.py:19-26)。
最后一步是巩固(consolidation):当语义特征积累到阈值(默认 20),就调用 llm_consolidate_features 把"重复 / 关联 / 矛盾"的特征合并、改写、删除。提示词在同一文件的 build_consolidation_prompt,思路类似神经网络的"长时程巩固"——把"原始记忆矿石 → 纯净的记忆颗粒 → 分箱 → 合金记忆"。
3.7 写入路径全景图
sequenceDiagram
participant C as Client
participant API as FastAPI Router
participant MM as MemMachine 主类
participant ES as EpisodeStore
participant EMM as EpisodicMemoryManager
participant STM as ShortTermMemory
participant LTM as LongTermMemory / EventMemory
participant SSM as SemanticSessionManager
participant BG as SemanticService 后台任务
participant LLM as LLM (抽取)
participant VG as VectorGraphStore / VectorStore
C->>API: POST /api/v2/memories
API->>MM: add_episodes(session, entries)
MM->>ES: add_episodes() # 原始持久化
par 并发
MM->>EMM: open_or_create_episodic_memory()
EMM->>STM: add_episodes()
EMM->>LTM: add_episodes()
LTM->>VG: add_nodes + add_edges
and
MM->>SSM: add_message() # 仅入队
end
MM-->>API: episode_ids
API-->>C: 200 OK
Note over BG: 每 N 秒轮询一次
BG->>SSM: get_history_messages()
BG->>LLM: llm_feature_update()
LLM-->>BG: SemanticCommand[]
BG->>SSM: apply_commands() (写 pgvector)
BG->>LLM: llm_consolidate_features()
到这里,我们已经看完了"写"的完整故事。接下来是"读"。
第 4 章 读取路径:一次查询是如何被回答的
4.1 入口:MemMachine.query_search()
源码:packages/server/src/memmachine_server/main/memmachine.py:916-986。
asyncdefquery_search(self, session_data, *, target_memories, query, limit, expand_context,
score_threshold, search_filter, agent_mode=False, ...) -> SearchResponse:
property_filter = parse_filter(search_filter) if search_filter elseNone
episodic_task = None
semantic_task = None
if MemoryType.Episodic in target_memories:
retrieval_agent = await self._get_retrieval_agent() if agent_mode elseNone
episodic_task = asyncio.create_task(self._search_episodic_memory(
session_data=session_data, query=query, limit=limit, ...,
retrieval_agent=retrieval_agent,
))
if MemoryType.Semantic in target_memories:
semantic_session = await self._resources.get_semantic_session_manager()
asyncdef_collect_semantic_results():
return [f asyncfor f in semantic_session.search(
message=query, session_data=session_data, ..., search_filter=property_filter,
)]
semantic_task = asyncio.create_task(_collect_semantic_results())
return MemMachine.SearchResponse(
episodic_memory=await episodic_task if episodic_task elseNone,
semantic_memory=await semantic_task if semantic_task elseNone,
)
要点:
情景与语义查询并发:通过两个 asyncio.create_task;agent_mode=True时启用 RetrievalAgent(第 5 章详述);search_filter是 DSL 字符串:parse_filter把它解析成FilterExpr(见 6.4 节)。
4.2 Episodic 查询的双路径
源码:packages/server/src/memmachine_server/main/memmachine.py:711-767。
asyncwith episodic_memory_manager.open_or_create_episodic_memory(...) as episodic_session:
if retrieval_agent isNoneor episodic_session.long_term_memory isNone:
# 普通路径:直接打底层 query_memory
response = await episodic_session.query_memory(
query=query, limit=limit, expand_context=expand_context,
score_threshold=score_threshold, property_filter=search_filter,
)
else:
# Agent 路径:由 RetrievalAgent 编排 LLM + 多个子代理
response = await self._query_episodic_with_retrieval_agent(
episodic_session=episodic_session, retrieval_agent=retrieval_agent, ...,
)
return response
4.3 普通路径:EpisodicMemory.query_memory()
源码:packages/server/src/memmachine_server/episodic_memory/episodic_memory.py:352-476。简化后的核心:
asyncdefquery_memory(self, query, *, limit=None, expand_context=0,
score_threshold=-inf, property_filter=None,
mode=QueryMode.BOTH) -> QueryResponse | None:
search_limit = limit if limit isnotNoneelse20
# 1) STM + LTM 并发查询
session_result, scored_long_episodes = await asyncio.gather(
self._query_short_term_memory(query=query, limit=search_limit, ...),
self._query_long_term_memory(query=query, limit=search_limit,
expand_context=expand_context, ...),
)
short_episode, short_summary = session_result
# 2) 去重:STM 优先(命中的 LTM 如果 uid 已经在 STM,则跳过)
episode_uid_set = {e.uid for e in short_episode}
unique_scored_long_episodes = []
for score, episode in scored_long_episodes:
if episode.uid notin episode_uid_set:
episode_uid_set.add(episode.uid)
unique_scored_long_episodes.append((score, episode))
# 3) 组装 QueryResponse(短期 + 长期 + 摘要)
return EpisodicMemory.QueryResponse(
short_term_memory=ShortTermMemoryResponse(
episodes=[...], episode_summary=[short_summary],
),
long_term_memory=LongTermMemoryResponse(
episodes=[EpisodeResponse(score=score, **e.model_dump())
for score, e in unique_scored_long_episodes],
),
)
注意几个细节:
STM 优先:同一条 Episode 如果 STM 已经包含,就不会再从 LTM 返回。这避免了"近期会话里说过的话被重复列出"。 QueryMode 支持三种模式: BOTH/LONG_TERM_ONLY/SHORT_TERM_ONLY,方便调用方做"只看长期 / 只看短期"。STM 不仅返回 episode,还返回 summary:当用户问"刚刚我们聊了啥",summary 就是天然的答案。
4.4 EventMemory 的查询流水线
如果情景记忆走 EventMemory,调用链最终落到 EventMemory.query(),源码:packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py:344-524。它分四个阶段:
flowchart LR
Q[query: str] --> E1[阶段 1: search_embed]
E1 --> E2[阶段 2: VectorStore.query]
E2 --> S1[抽取 seed_segment_uuids]
S1 --> E3[阶段 3: get_segment_contexts]
E3 --> E4[阶段 4: 评分 + 排序]
E4 --> R[ScoredSegmentContext 列表]
代码骨架:
asyncdef_query(self, query, *, vector_search_limit, expand_context,
property_filter, format_options) -> QueryResult:
# 阶段 1: 嵌入查询
query_embedding = (await self._embedder.search_embed([query]))[0]
# 阶段 2: 在 derivative 集合上做向量召回
[query_result] = await self._vector_store_collection.query(
query_vectors=[query_embedding], limit=vector_search_limit,
property_filter=collection_filter, return_vector=False, return_properties=True,
)
# 多个 derivative 可能指向同一个 segment,按 segment 去重
seed_embedding_scores: dict[UUID, float] = {}
for match in query_result.matches:
segment_uuid = UUID(str(match.record.properties[_SEGMENT_UUID_FIELD_NAME]))
if segment_uuid notin seed_embedding_scores:
seed_embedding_scores[segment_uuid] = match.score
seed_segment_uuids = list(seed_embedding_scores)
# 阶段 3: 以 seed 为中心向前/向后扩展上下文
max_backward_segments = expand_context // 3
max_forward_segments = expand_context - max_backward_segments
segment_contexts_by_seed = await self._segment_store_partition.get_segment_contexts(
seed_segment_uuids=seed_segment_uuids,
max_backward_segments=max_backward_segments,
max_forward_segments=max_forward_segments,
property_filter=property_filter,
)
# 阶段 4: 评分。无 reranker 用 embedding 分数,有 reranker 用语义重排
if self._reranker isNone:
scores = [seed_embedding_scores[u] for u in kept_seed_segment_uuids]
else:
scores = await self._score_segment_contexts(query, segment_contexts, format_options)
# 排序后返回 ScoredSegmentContext 列表
return QueryResult(scored_segment_contexts=[...])
两个值得展开的点:
expand_context1:2 倾斜:源码里max_backward_segments = expand_context // 3,剩下的给前向。原因是大多数对话场景下,后文比前文更能解释当前段(用户提到的"它"通常指代后面才出现的实体)。property_filter的字段名转换:用户写filter_dict={"user_id": "alice"},MemMachine 内部会把user_id翻译为_user_id或m.user_id(依赖于是否为系统字段),具体在_to_vector_record_property方法里。这套规则使得"用户元数据"和"系统元数据"在同一个向量库 record 上共存而不冲突。
4.5 Semantic 查询:向量召回 + 跨 set 过滤
源码:packages/server/src/memmachine_server/semantic_memory/semantic_memory.py:169-200。
asyncdef_set_id_search(self, *, set_id, embedding, min_distance=None, limit=30,
load_citations=False, filter_expr=None) -> AsyncIterator[SemanticFeature]:
filter_expr = _with_has_set_ids(set_ids=[set_id], filter_expr=filter_expr)
asyncfor feature in self._semantic_storage.get_feature_set(
filter_expr=filter_expr, page_size=limit,
vector_search_opts=SemanticStorage.VectorSearchOpts(
query_embedding=np.array(embedding), min_distance=min_distance,
),
load_citations=load_citations,
):
yield feature
特点:
语义查询走的是pgvector(默认)或 Neo4j 的向量索引; 一次查询可以跨多个 set_id(用merge_async_iterators合并多个 set 的结果);load_citations=True时,返回的特征会带上原始 Episode UID,便于上层做"引用展示"。
4.6 读取路径全景图
sequenceDiagram
participant C as Client
participant MM as MemMachine
participant EMM as EpisodicMemoryManager
participant EM as EpisodicMemory
participant STM as ShortTermMemory
participant LTM as LongTermMemory / EventMemory
participant RA as RetrievalAgent (可选)
participant SSM as SemanticSessionManager
participant Emb as Embedder
participant Rerank as Reranker
C->>MM: query_search(query, agent_mode=?)
par 情景检索
MM->>EMM: open_or_create()
EMM->>EM: query_memory(query)
alt agent_mode=False
par STM/LTM 并发
EM->>STM: get_short_term_memory_context()
EM->>LTM: search_scored()
LTM->>Emb: search_embed()
LTM->>Rerank: rerank()
end
EM-->>MM: QueryResponse(STM+LTM)
else agent_mode=True
EM->>RA: do_query()
RA-->>EM: list[Episode]
EM-->>MM: QueryResponse
end
and 语义检索
MM->>SSM: search(query)
SSM->>Emb: search_embed()
SSM-->>MM: list[SemanticFeature]
end
MM-->>C: SearchResponse(episodic + semantic)
第 5 章 RetrievalAgent:Agent Lightning 思路下的智能检索
RetrievalAgent 是 MemMachine 在 v0.3.x 里引入的一个新功能:把"检索"本身视为一个 Agent,而不是一次确定性的向量查询。它的灵感来自 Agent Lightning: Train ANY AI Agents with Reinforcement Learning(arXiv: 2508.03680,见仓库 README.md 的 BibTeX)。
5.1 角色卡:三个 child agent 加一个路由器
源码:packages/server/src/memmachine_server/retrieval_agent/service_locator.py:17-57。
defcreate_retrieval_agent(*, model, reranker, agent_name="ToolSelectAgent"):
memory_agent = MemMachineAgent(...)
if agent_name == memory_agent.agent_name:
return memory_agent
coq_agent = ChainOfQueryAgent(...)
split_agent = SplitQueryAgent(...)
if agent_name == coq_agent.agent_name:
return coq_agent
if agent_name == split_agent.agent_name:
return split_agent
return ToolSelectAgent(AgentToolBaseParam(
model=model,
children_tools=[split_agent, coq_agent, memory_agent],
extra_params={"default_tool_name": coq_agent.agent_name},
reranker=reranker,
))
四种 agent 的分工:
MemMachineAgent | EpisodicMemory.query_memory(LONG_TERM_ONLY) | ||||
SplitQueryAgent | |||||
ChainOfQueryAgent | |||||
ToolSelectAgent |
5.2 ToolSelectAgent:用 LLM 做路由
核心提示词(节选自 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py:21-81):
You are a tool router. ...
GOAL
- Choose exactly one of: {coq}, {split_query}, {memory_retrieval}
- Output NONE only when the query type cannot be determined...
MECHANISM
1) Validate input
2) Classify the query type:
A) MULTI-HOP -> {coq}
B) SINGLE-HOP MULTI ENTITY -> {split_query}
C) SINGLE-HOP / DIRECT -> {memory_retrieval}
它在调用链上的角色:
flowchart LR
Q[原始 query] --> TS[ToolSelectAgent.LLM]
TS -- 多跳 --> COQ[ChainOfQueryAgent]
TS -- 多实体 --> SQ[SplitQueryAgent]
TS -- 单跳 --> MM[MemMachineAgent]
TS -- 无法判定 --> DEF[default_tool = COQ]
do_query 实现见 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py:179-197,会把所选 agent 的名字写回 perf_metrics["selected_tool"],方便观测。
5.3 ChainOfQueryAgent:迭代式查询
ChainOfQuery 的关键创新是用 单个 LLM 调用 同时完成"充分性判断 + 查询重写"。看核心提示词(packages/server/src/memmachine_server/retrieval_agent/agents/coq_agent.py:25-125)的输出契约:
{
"is_sufficient": true | false,
"evidence_indices": [0, 2, 5],
"new_query": "...",
"confidence_score": 0.0 ~ 1.0
}
调用流程:
attempt = 0
while attempt < max_attempts:
docs = memmachine_agent.do_query(current_query)
rsp = LLM(COMBINED_SUFFICIENCY_AND_REWRITE_PROMPT,
original_query, used_queries, retrieved_docs=docs)
if rsp.is_sufficient and rsp.confidence_score >= 0.8:
break
current_query = rsp.new_query
attempt += 1
return aggregated_docs
实践中你会看到这种回放:
[CoQ #0] query="Marie Curie 的丈夫从事什么领域?" -> 不充分
[CoQ #1] query="Pierre Curie 的研究领域" -> 充分,置信 0.92
这正是 Agent Lightning 论文中的"小模型迭代式 RAG"思路。
5.4 SplitQueryAgent:单跳多实体并发
当 ToolSelectAgent 判定 query 是"单跳但包含多个独立实体"(如 Compare GDP of Japan and South Korea)时,会路由到 SplitQueryAgent。它会用 LLM 把 query 拆成多个子查询并并发执行,再用 reranker 合并去重。源码在 packages/server/src/memmachine_server/retrieval_agent/agents/split_query_agent.py。
5.5 MemMachineAgent:直查 LTM
最便宜的 agent,定义见 packages/server/src/memmachine_server/retrieval_agent/agents/memmachine_retriever.py:47-91。它直接调用 EpisodicMemory.query_memory(mode=LONG_TERM_ONLY),不做任何重写、不调任何额外 LLM。
asyncdefdo_query(self, policy, query):
query_response = await query.memory.query_memory(
query=query.query, limit=query.limit,
expand_context=query.expand_context,
score_threshold=query.score_threshold,
property_filter=query.property_filter,
mode=EpisodicMemory.QueryMode.LONG_TERM_ONLY,
)
return [Episode(...) for episode in query_response.long_term_memory.episodes], perf
5.6 Reranker 在 RetrievalAgent 中的作用
AgentToolBase._do_rerank 是所有 child agent 在返回之前的最后一道工序(packages/server/src/memmachine_server/retrieval_agent/common/agent_api.py:97-137):
asyncdef_do_rerank(self, query, episodes):
if query.limit <= 0:
return sorted(episodes, key=lambda x: x.created_at)
if len(episodes) <= query.limit or self._reranker isNone:
return sorted(episodes[:query.limit], key=lambda x: x.created_at)
contents = [episodes_to_string([e]) for e in episodes]
scores = await self._reranker.score(query.query, contents) # 带重试 + 限流
result = sorted(zip(episodes, scores), key=lambda x: x[1], reverse=True)
return sorted([r[0] for r in result[:query.limit]], key=lambda x: x.created_at)
排序策略很务实:
先按重排分数挑 Top-K,保证质量; 再按 created_at升序输出,让上层把"语义最相关但时间正确"的 Episode 串起来;重排出错且是限流时,自动 await sleep(5)后重试,最多 60 次。
可选的 reranker(在 packages/server/src/memmachine_server/common/reranker/ 下):
identity:不改变顺序,作为占位;bm25:纯文本匹配;rrf-hybrid:Reciprocal Rank Fusion 把 embedding 分与 BM25 分融合;cohere:Cohere Rerank API;amazon-bedrock:AWS Bedrock Rerank API。
第 6 章 存储抽象:MemMachine 如何在不同后端间自由切换
MemMachine 的"可移植性"来自三个核心抽象:EpisodeStore、VectorStore、VectorGraphStore。它们各自承担不同的职责。
6.1 三类存储抽象
flowchart LR
EM[EpisodicMemory<br/>EventMemory<br/>SemanticMemory] --> ES[EpisodeStore]
EM --> VS[VectorStore]
EM --> VGS[VectorGraphStore]
EM --> SS[SemanticStorage]
ES --> ESQL[(PostgreSQL)]
ES --> ESSQLite[(SQLite)]
VS --> Qdrant[(Qdrant)]
VS --> USearch[(SQLite + USearch/hnswlib)]
VS --> SVec[(SQLite-vec)]
VGS --> Neo[(Neo4j)]
VGS --> Nebula[(NebulaGraph)]
SS --> PG[(PostgreSQL + pgvector)]
SS --> N4SS[(Neo4j)]
EpisodeStore | |||
VectorStore | |||
VectorGraphStore | |||
SemanticStorage |
6.2 VectorStore 抽象
源码:packages/server/src/memmachine_server/common/vector_store/vector_store.py。
核心接口(伪代码):
classVectorStore:
asyncdefcreate_collection(namespace: str, name: str,
config: VectorStoreCollectionConfig) -> VectorStoreCollection
async def get_collection(...) -> VectorStoreCollection | None
async def drop_collection(...) -> None
class VectorStoreCollection:
asyncdefupsert(records: list[Record]) -> None
async def query(query_vectors, *, limit, property_filter,
return_vector, return_properties) -> list[QueryResult]
async def get(uuids) -> list[Record]
async def delete(uuids) -> None
约束:
每个 Collection 在创建时固定 dimensions、similarity_metric、indexed_properties_schema。换嵌入模型要么用新 Collection,要么用别名机制。命名约束 [a-z0-9_]+、单段 ≤ 32 字节,是为了和 PostgreSQL 表名、Neo4j label、Qdrant collection 名等都兼容。Record.properties同时支持系统属性(如_segment_uuid)与用户属性(直接进入 record)。EventMemory 利用这一点把"切片元数据"和"业务元数据"放在一张表里。
6.3 VectorGraphStore 抽象
源码:packages/server/src/memmachine_server/common/vector_graph_store/。
这一抽象的特点是:节点既能向量检索,又能图遍历。Neo4j 的 vector index + Cypher 是天然实现;NebulaGraph 5.2+ 也内置了向量类型。
它的主要接口可以理解为 VectorStore 的超集:
add_nodes / add_edges:写节点与关系;query_nodes:向量召回 + 属性过滤;traverse:从命中的节点出发做图遍历(如沿[DERIVED_FROM]拿回 Episode)。
这就是为什么 LongTermMemory 用一个抽象解决了"召回 + 关联",不需要再调用 EpisodeStore。
6.4 FilterExpr:跨存储的过滤 DSL
源码:packages/server/src/memmachine_server/common/filter/filter_parser.py。
用户写:
user_id = "alice" AND (tag IN ("flight", "hotel") OR session_id = "s001")
被解析成 FilterExpr 树:
And(
left=Comparison(field="user_id", op="=", value="alice"),
right=Or(
left=In(field="tag", values=["flight", "hotel"]),
right=Comparison(field="session_id", op="=", value="s001"),
)
)
然后由每种存储实现各自把这棵树翻译成原生过滤:
WHERE | |
Filter | |
WHERE 子句 | |
WHERE |
这样调用方写一份 filter 字符串,无论后端怎么切换都能工作。
6.5 公共组件总览
packages/server/src/memmachine_server/common/ 下的组件构成了 MemMachine 的"标准件库":
embedder/ | ||
language_model/ | generate_response / generate_parsed_response | |
reranker/ | ||
metrics_factory/ | ||
resource_manager/ | ||
session_manager/ | ||
episode_store/ |
ResourceManager 是这套组件的"中央集线器":MemMachine 主类里所有 await self._resources.get_embedder(...) 都从它出。
第 7 章 工程细节:决定生产可用的关键
7.1 实例 LRU 缓存
每个 (org_id, project_id) 都需要一个 EpisodicMemory 实例(持有 STM 内存、LTM 连接),频繁创建/销毁会拖慢响应。源码:packages/server/src/memmachine_server/episodic_memory/instance_lru_cache.py 与 episodic_memory_manager.py。
# packages/server/src/memmachine_server/episodic_memory/episodic_memory_manager.py:43-52
classEpisodicMemoryManagerParams(BaseModel):
instance_cache_size: int = 100# 同时驻留的最大实例数
max_life_time: int = 600# 空闲多久后回收(秒)
...
机制要点:
引用计数:通过 release_ref跟踪有多少协程正在访问,正在用的不淘汰;后台清理协程:每 2 秒扫一次,把空闲超时的实例 close 掉; session-level 读写锁:保证同一会话同时只有一个 create 在跑。
这套机制让你可以同时服务上万会话,而 Neo4j / Qdrant 的连接池不会爆。
7.2 全异步 IO 模型
整个 MemMachine 服务端是**纯 async def**:
asyncio.gather用来 fan-out(写入 STM/LTM/Semantic 并发、STM/LTM 并发查询);asyncio.TaskGroup用在 ingestion 时收集 episode(Python 3.12+);所有外部 IO(数据库、嵌入、LLM)都是协程友好的库; rw_locks.AsyncRWLock实现了非阻塞读写锁(一写多读)。
理解这一点之后就不会惊讶为什么 MemMachine 在单 worker 下也能撑住几千 QPS——FastAPI 默认 single-process + uvicorn 已经够了。
7.3 多 Worker 部署
packages/server/src/memmachine_server/server/app.py:100-144 给了如何起多 worker:
workers_env = os.getenv("MEMMACHINE_WORKERS")
if workers_env:
workers = int(workers_env)
else:
workers = 1# 容器化环境默认 1,避免误用宿主 CPU 数
uvicorn.run("memmachine_server.server.app:app",
host=config.server.host, port=config.server.port,
workers=workers, access_log=True, log_level=...)
提醒:
**不要无脑设 os.cpu_count()**:容器里这个值经常是宿主机的核数,不是 cgroup 限额;多 worker 共用同一份 Postgres/Neo4j 后端:每个 worker 自己起一个 Python 进程,不共享内存中的 LRU 缓存,因此实际容量是 workers × instance_cache_size。
7.4 会话删除的异步队列
源码:packages/server/src/memmachine_server/main/memmachine.py:309-359。删除流程是软删除 + 异步清理:
asyncdefdelete_session(self, session_data) -> None:
await session_manager.update_session_status(
session_key=session_data.session_key,
status=SessionDataManager.SessionStatus.Deleted, # 先打软删除标
)
self._deletion_queue.put_nowait(session_data) # 进入清理队列
后台 _delete_session_worker 依次:
删 EpisodeStore 里的所有 episode(分批 EPISODE_DELETE_BATCH_SIZE=1000);调用 _cleanup_semantic_history清掉对应的语义引用;删 episodic memory; 删 semantic memory; 最后把 session 行从 session 管理器移除。
为什么要异步?
大会话动辄 10 万 episode,同步删会让 API 超时; 删除失败时实例仍然是"软删除"状态,可以重试; 服务重启时通过 get_sessions_by_status(Deleted)还能把未完成的清理恢复出来(见start()内的key_to_session钩子)。
7.5 配置合并:分阶段使用不同模型
源码:packages/server/src/memmachine_server/main/memmachine.py:420-460。
_with_default_episodic_memory_conf 把"全局默认"和"用户 per-session 配置"做深度合并。这使得:
全局可以指定 default_long_term_memory_embedder = openai_large、default_long_term_memory_reranker = cohere;单会话可以覆盖: {"long_term_memory": {"embedder": "bedrock_titan"}};三类 LLM( agent/answer/judge)可以各自使用不同模型,节省成本(便宜模型做路由 + 重写,贵模型只在最后一步)。
7.6 可观测性
MemMachine 内嵌 Prometheus 指标,几个最有用的指标:
Ingestion_latency | |
query_latency | |
Ingestion_countquery_count | |
event_memory_encode_events_phase_seconds{phase=...} | |
event_memory_query_phase_seconds{phase=...} |
直接挂 Grafana 即可。配合 event_memory_* 系列指标,你可以一眼看出"是嵌入慢还是 segment_store 慢"。
第 8 章 端到端实战:把所有知识拼成一张图
8.1 写入:memory.add("我喜欢靠窗的座位") 究竟做了多少事?
client = MemMachineClient(base_url="http://localhost:8080")
project = client.get_or_create_project(org_id="acme", project_id="travel")
memory = project.memory(user_id="alice", agent_id="travel_agent", session_id="s001")
memory.add("我喜欢靠窗的座位,最好是商务舱。", metadata={"category": "travel"})
服务端发生了:
REST 解析 → AddMemoriesSpec→EpisodeEntry;路由到 MemMachine 主类 → add_episodes(session_data, episode_entries);EpisodeStore 写入 → 一行 episodes落库,返回新分配的uid;并发开启两个任务: 把 category=travel拷贝到filterable_metadata;并发跑 STM 与 LTM: STM:append 到 deque;若总长度超阈值,弹出旧 episode + 异步走滚动摘要; LTM/EventMemory:把消息切成 segment / derivative,嵌入后写 Neo4j 或 VectorStore; EpisodicMemory.add_memory_episodes: SemanticSessionManager.add_message:仅把 episode_id 写入"待抽取"表; API 立即返回 episode_ids; 若干秒后,后台的 _background_ingestion_task扫到该 set "脏了",触发:拉取 5 条未抽取 episode; 对每个语义 category(如 profile_prompt、crm_prompt),调 LLM 产出 SemanticCommand:[{"command":"add", "tag":"Travel Preferences",
"feature":"seat_preference", "value":"User prefers window seat in business class"}]apply commands → 嵌入 → 写 pgvector,并把这条 episode 标记为 is_ingested=True;若特征数 ≥ 20,触发巩固 LLM 调用,合并/删除冗余特征。
8.2 读取:memory.search("用户的座位偏好?")
results = memory.search("用户的座位偏好?")
print(results.content.episodic_memory.long_term_memory.episodes[0].content)
# → "我喜欢靠窗的座位,最好是商务舱。"
print(results.content.semantic_memory[0])
# → SemanticFeature(tag="Travel Preferences", feature_name="seat_preference",
# value="User prefers window seat in business class", ...)
服务端:
MemMachine.query_search(session_data, query=...)并发开两个 task;Episodic task: 打开 EpisodicMemory 实例(从 LRU 缓存); 普通模式:并发查 STM + LTM;STM 优先去重;按嵌入分或 reranker 分数排序; Agent 模式: ToolSelectAgent用 LLM 判定"这是单跳直接查询" → 路由到MemMachineAgent→ 直查 LTM;Semantic task: 嵌入 query; 在 pgvector 上做向量召回(按 set_id 过滤、 min_distance阈值);返回 SemanticFeature列表(含citations回链到原 Episode);合并成 SearchResponse返回。
8.3 跟踪建议
当你想看清楚一条请求的实际行为:
# 1) 提高 server 日志级别
export MEMMACHINE_LOG_LEVEL=DEBUG
# 2) 实时看 EventMemory 五阶段时序
curl http://localhost:8080/metrics | grep event_memory_
# 3) 在 client 端开 timing
import time; t=time.time(); memory.search("..."); print(time.time()-t)
EventMemory 的 encode_events timing: segmentation=... derivation=... ... 日志非常直观,定位瓶颈一目了然。
第 9 章 把 MemMachine 接到你的 Agent
这里只给出"原理视角的决策表"。具体的 SDK / REST / MCP 接入示例可在 MemMachine 仓库的
README.md与examples/目录中找到。
| Python SDK 内嵌 | memmachine-client;与 LangChain / LangGraph / CrewAI / LlamaIndex 都有现成 wrapper | |
| REST API | /api/v2/memories/*;注意 agent_mode 参数选 RetrievalAgent 与否 | |
| MCP Server | memmachine-mcp-stdio 或 mcp-http,工具签名即记忆 CRUD | |
| Function Calling Tool | add / search 包装成 tool spec;让 LLM 自己决定何时调用 | |
| 隐式注入(Sidecar/Hook) | memory.search(query) 的结果拼到 system prompt |
原理层面上要选对方式,只需要回答两个问题:
你希望 LLM 主动决定何时记忆 / 何时回忆吗? 是 → 用 Function Calling Tool 或 MCP; 否 → 用 SDK / REST 在应用代码里"埋点"。 你能容忍多少端到端延迟? <100ms 的本地 Agent → SDK + 嵌入式 SQLite 后端; 多租户云服务 → REST + Qdrant/Neo4j; 离线分析 → 全部走 REST,启用 RetrievalAgent + 大模型重排。
附录 A:源码路径速查表
packages/server/src/memmachine_server/server/app.py | |
packages/server/src/memmachine_server/server/api_v2/router.py | |
packages/server/src/memmachine_server/main/memmachine.py | |
packages/server/src/memmachine_server/episodic_memory/episodic_memory.py | |
packages/server/src/memmachine_server/episodic_memory/episodic_memory_manager.py | |
packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py | |
packages/server/src/memmachine_server/episodic_memory/long_term_memory/long_term_memory.py | |
packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py | |
packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py | |
packages/server/src/memmachine_server/episodic_memory/event_memory/data_types.py | |
packages/server/src/memmachine_server/episodic_memory/event_memory/segmenter/text_segmenter.py | |
packages/server/src/memmachine_server/episodic_memory/event_memory/deriver/text_deriver.py | |
packages/server/src/memmachine_server/semantic_memory/semantic_memory.py | |
packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py | |
packages/server/src/memmachine_server/semantic_memory/semantic_llm.py | |
packages/server/src/memmachine_server/semantic_memory/util/semantic_prompt_template.py | |
packages/server/src/memmachine_server/server/prompt/default_prompts.py | |
packages/server/src/memmachine_server/retrieval_agent/service_locator.py | |
packages/server/src/memmachine_server/retrieval_agent/agents/coq_agent.py | |
packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py | |
packages/server/src/memmachine_server/retrieval_agent/agents/memmachine_retriever.py | |
packages/server/src/memmachine_server/retrieval_agent/common/agent_api.py | |
packages/server/src/memmachine_server/common/vector_store/vector_store.py | |
packages/server/src/memmachine_server/common/filter/filter_parser.py | |
packages/server/src/memmachine_server/semantic_memory/cluster_manager.py |
附录 B:术语对照
tag/feature/value) | ||
(org_id, project_id) 或某个 user | ||
结语
如果只用一句话总结 MemMachine 的设计:
它把"AI 长期记忆"建模成多层的、可分离的、可观测的存储抽象,再用 LLM 把对话蒸馏成结构化事实,最后用 Agent 化的检索代理把找回来的内容做精准编排。
它不是简单的"向量库套一层 API",而是把人类认知科学(工作 / 情景 / 语义记忆)与现代 LLM 工程实践(异步抽取、Agent Lightning、可插拔后端)有机融合的工程作品。
夜雨聆风