Langgraph框架源码解析系列——Checkpoint篇
一、系列概述概述
LangGraph 由 LangChain 团队推出,是对早期 LangChain “链式”范式的根本性升级。它将 Agent 的执行流程建模为有向图(甚至带环图),天然支持循环、条件分支、状态回溯等复杂控制流,在状态管理的清晰度与人机协作(Human-in-the-loop)的支持完善度上表现突出,并可与 LangSmith、Langfuse 等 AI 可观测性工具深度集成。 在当前 Agent 框架竞争格局中,LangGraph 与 AutoGen、CrewAI、LlamaIndex Workflows 并列为最受关注的几个主流选择,但凭借更成熟的工程化设计和更深厚的生态整合,往往被认为在落地复杂业务场景时领先一档。
这个系列主要基于作者目前工作中基于该框架的一些实践和学习进行输出,持续性不定期更新,剖析框架设计理念,并分享一些生产实践心得。
二、Checkpoint 核心概念
2.1 什么是 Checkpoint?
Checkpoint 是 LangGraph 的持久化层机制,在图执行的每个 superstep 保存状态快照。类比游戏存档功能:在关键节点保存进度,支持随时暂停、恢复或回溯。
核心价值:
-
人机交互:支持人工介入和决策 -
状态记忆:在多次交互间保持上下文 -
故障恢复:从失败点继续执行 -
时间旅行:回溯到任意历史状态
2.2 目录结构
LangGraph 采用 monorepo 架构管理核心库与Checkpoint实现,Checkpoint 相关模块按职责分层:
|
|
|
|
|---|---|---|
checkpoint |
|
|
checkpoint-postgres |
|
|
checkpoint-sqlite |
|
|
checkpoint-conformance |
|
|
langgraph |
|
|
prebuilt |
|
|
其中checkpoint目录结构如下:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linelibs/checkpoint/ # 基础接口包├── langgraph/checkpoint/│ ├── base/ # BaseCheckpointSaver、核心数据结构│ ├── memory/ # InMemorySaver(内存实现)│ └── serde/ # 序列化协议│ ├── base.py # SerializerProtocol 接口│ ├── jsonplus.py # JSON 扩展序列化│ └── encrypted.py # 加密序列化├── checkpoint-postgres/ # PostgreSQL 持久化实现└── checkpoint-sqlite/ # SQLite 持久化实现
三、核心数据结构
3.1 CheckpointTuple
CheckpointTuple 是 NamedTuple 对象,记录单次运行快照的完整数据:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# libs/checkpoint/langgraph/checkpoint/base/__init__.py:112class CheckpointTuple(NamedTuple):config: RunnableConfig # LangChain 执行配置checkpoint: Checkpoint # 状态信息metadata: CheckpointMetadata # 元数据parent_config: RunnableConfig | None = None # 父级配置pending_writes: list[PendingWrite] | None = None # 待处理writes
3.2 Checkpoint
状态存储对象,数据存储在 channel(通道)中:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# libs/checkpoint/langgraph/checkpoint/base/__init__.py:65class Checkpoint(TypedDict):v: int # 格式版本id: str # 唯一且单调递增的 ID(UUID6)ts: str # ISO 8601 时间戳channel_values: dict[str, Any] # 通道值快照channel_versions: ChannelVersions # 各通道版本versions_seen: dict[str, ChannelVersions] # 各节点看到的版本updated_channels: list[str] | None # 本次更新的通道
3.3 CheckpointMetadata
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# libs/checkpoint/langgraph/checkpoint/base/__init__.py:35class CheckpointMetadata(TypedDict):source: str # "input" | "loop" | "update" | "fork"step: int # 步骤号parents: dict # 父 checkpoint ID 映射run_id: str # 运行 ID
四、关键机制详解
4.1 Thread:状态隔离机制
Thread 通过唯一标识实现多租户场景下的状态隔离:
|
|
|
|
|---|---|---|
thread_id |
|
|
checkpoint_ns |
|
|
checkpoint_id |
|
|
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 基础配置config_with_thread = {"configurable": {"thread_id": "1"}}# 完整配置config_with_checkpoint = {"configurable": {"thread_id": "1","checkpoint_ns": "","checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"}}
4.2 序列化协议(Serde)
JsonPlusSerializer 采用分层序列化策略:
|
|
|
|
|---|---|---|
None |
"null"
|
|
bytes/bytearray |
|
|
|
|
|
|
|
|
|
|
核心方法 dumps_typed 的实现逻辑:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef dumps_typed(self, obj: Any) -> tuple[str, bytes]:if obj is None:return "null", b""elif isinstance(obj, bytes):return "bytes", objelif isinstance(obj, bytearray):return "bytearray", objelse:try:# 优先使用 msgpack,失败回退到 picklereturn "msgpack", ormsgpack.packb(obj)except Exception:if self.pickle_fallback:return "pickle", pickle.dumps(obj)raise
4.3 Pending Writes:容错恢复机制
Writes可以看作是一种带节点运行结果的操作日志,根据不同场景有不同的类型以及对应的值,Pending Writes 实现Task级别(也就是Graph里的节点)实时持久化:
-
触发时机:每个 Task 执行完成立即写入(通过 PregelRunner.commit回调) -
作用:确保单个 Task 失败时,其他已完成 Task 的结果不丢失 -
与 Checkpoint 关系: put_writes是细粒度写入,put是全局快照
存储结构:包含当前节点执行的唯一id、channel名和对应值,一次任务基于channel可能会有多个Writes供写入
ounter(linePendingWrite = tuple[str, str, Any] # (task_id, channel_name, value)
Write 类型(源码定义于 _constants.py):
|
|
|
|
|---|---|---|
ERROR |
|
|
INTERRUPT |
|
|
RESUME |
|
|
PUSH |
Send
|
|
NO_WRITES |
|
|
INPUT |
|
|
4.4 通道(Channel):状态载体
Channel 是 LangGraph中承载状态的基本单元。每个状态字段对应一个 Channel,它定义了如何存储、如何更新以及如何持久化状态值。 Channel 是 Pregel 图计算模型的核心抽象,通过三字段协作机制实现增量计算、依赖追踪和并发控制:
ounter(lineounter(lineounter(lineounter(lineounter(line┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐│ channel_values │ │ channel_versions │ │ versions_seen ││ 状态存储层 │ ←── │ 版本控制层 │ ←── │ 依赖感知层 ││ "数据是什么" │ │ "数据第几版" │ │ "节点看过哪版" │└─────────────────┘ └──────────────────┘ └─────────────────┘
channel_values —— 状态存储层
下面是支持的Channel类型,不同类型对应到状态更新时有不同行为,通常我们自己定义的基础数据结构状态字段,如:int、str、float、dict等会被认为是LastValue,也就是在Graph的节点返回时覆盖当前state该字段值。
|
|
|
|
|
|---|---|---|---|
| LastValue |
|
|
|
| LastValueAfterFinish |
finish() 后才可用 |
finish() 后才可读 |
|
| AnyValue |
|
|
|
| EphemeralValue |
|
|
|
| UntrackedValue |
|
|
|
| Topic |
|
|
|
| BinaryOperatorAggregate |
|
|
|
| NamedBarrierValue |
|
|
|
| NamedBarrierValueAfterFinish |
finish() 后才可用 |
|
|
channel_versions —— 版本控制层
Checkpoint中为每个Channel维护单调递增的版本号(默认整数自增),是 Pregel 依赖追踪的基础:
ounter(lineounter(lineounter(lineounter(line{"messages": 3, # 更新 3 次"counter": 5 # 更新 5 次}
核心作用:
-
增量检测:对比 checkpoint_previous_versions与channel_versions,识别本次 Superstep(超步,pregel概念,理解为执行一批次的Task) 哪些通道发生变化 -
全局排序: checkpoint_id与版本号共同构成全局时序,确保分布式环境下的状态一致性 -
并发安全:版本号单调递增,天然具备乐观锁特性
versions_seen —— 依赖感知层
记录每个节点上次执行时看到的版本快照,是实现”仅当输入变化时才重新执行节点”的关键:
ounter(lineounter(lineounter(lineounter(line{"node_A": {"messages": 2, "counter": 4}, # node_A 执行时看到的状态"node_B": {"messages": 3, "counter": 4} # node_B 执行时看到的状态}
核心作用:
-
节点调度: prepare_next_tasks对比versions_seen[node]与当前channel_versions,若通道版本更新则触发节点执行 -
幂等保证:即使节点被多次调度,只要输入版本未变,可安全跳过(配合缓存机制) -
细粒度依赖:支持节点仅订阅特定通道,未订阅通道的更新不触发重新执行
在 Pregel 循环中的协作
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineSuperstep N├── tick() 准备任务│ └── 对比 versions_seen[node] vs channel_versions│ └── 若 messages: 2→3,触发订阅该通道的节点├── runner.tick() 执行节点│ └── 节点读取 channel_values(版本已保证是最新的)└── after_tick() 提交├── apply_writes() 更新 channel_values├── get_next_version() 递增 channel_versions└── 更新 versions_seen[executed_nodes] = 当前版本快照
这种设计让 LangGraph 在支持任意复杂状态依赖的同时,仍能高效地仅重算必要的节点,是实现”增量图计算”的基石。
五、接口设计
5.1 BaseCheckpointSaver
所有 Checkpoint 存储器必须实现以下接口:
|
|
|
|
|---|---|---|
put |
|
|
put_writes |
|
|
get_tuple |
|
|
list |
|
|
delete_thread |
|
|
get_next_version |
|
|
5.2 方法调用时机与源码剖析
理解 Checkpoint 各方法的调用时机,是掌握 LangGraph 状态流转的关键。以下结合源码,深入解析每个方法的触发场景。这里说的源码指的都是libs/langgraph/langgraph/pregel模块,可以说整个checkpoint架构都是为了pregel而设计,pregel作为整个langgraph的核心流程引擎,代码非常的复杂,后续会有单独的一篇做详细介绍。
put
调用场景一览:
|
|
|
|
|
|---|---|---|---|
| 用户输入写入后 | _loop.py:721-723 |
input |
|
| Superstep 完成后 | _loop.py:563 |
loop |
|
| 手动更新状态 | main.py:1576-1862 |
update
fork/input |
update_state
|
| 执行异常退出 | _loop.py:830 |
|
|
源码级执行流程:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 以 _loop.py 中的 after_tick 为例def after_tick(self) -> None:# 收集本 step 所有任务的writeswrites = [w for t in self.tasks.values() for w in t.writes]# 记录checkpoint和writes(全局同步点)self.updated_channels = apply_writes(self.checkpoint, self.channels, self.tasks.values(), ...)# 清空 pending_writes,防止重复应用self.checkpoint_pending_writes.clear()# ★ 关键:保存全局 checkpointself._put_checkpoint({"source": "loop"})
设计要点:put 是全局同步点,只在 Superstep 边界触发,确保状态一致性。频繁调用 put_writes 记录中间结果,但只在关键节点做全局快照。
put_writes
调用场景一览:
|
|
|
|
|
|---|---|---|---|
| Task 执行完成 | _runner.py:434-459 |
|
|
| Command 输入处理 | _loop.py:667 |
|
|
| 手动状态更新 | main.py:1632-1877 |
|
update_state
|
核心执行路径解析:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 1. Runner 层:任务完成后回调(高频触发点)def commit(self, task: PregelExecutableTask, exception):if exception is None:# 正常完成:立即写入任务结果self.put_writes()(task.id, task.writes)elif isinstance(exception, GraphInterrupt):# 中断场景:保存中断信息供后续恢复writes = [(INTERRUPT, exception.args[0])]self.put_writes()(task.id, writes)else:# 异常场景:保存错误信息task.writes.append((ERROR, exception))self.put_writes()(task.id, task.writes)# 2. Loop 层:异步提交到持久层def put_writes(self, task_id: str, writes: WritesT) -> None:# 内存缓存(供本 Step 后续任务读取)self.checkpoint_pending_writes.extend((task_id, c, v) for c, v in writes)# 异步持久化(不阻塞执行)if self.durability != "exit" and self.checkpointer_put_writes:self.submit(self.checkpointer_put_writes, config, writes_to_save, task_id)
关键设计:put_writes 是细粒度、高频的写入,每个 Task 完成即触发。它实现了:
-
故障隔离:单个 Task 失败不影响其他 Task 结果 -
实时可见:Task 输出立即可被后续 Task 读取 -
性能优化:异步提交不阻塞主循环
get_tuple
调用场景:
|
|
|
|
|---|---|---|
| 图执行启动 | _loop.py:1085/1264 |
|
| 查询当前状态 | main.py:1280/1324 |
StateSnapshot 供用户查看 |
恢复逻辑源码:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# SyncPregelLoop.__enter__ 中的恢复逻辑def __enter__(self):saved = self.checkpointer.get_tuple(self.checkpoint_config)if saved is None:# 首次执行:创建空 checkpointsaved = CheckpointTuple(self.checkpoint_config,empty_checkpoint(), # v=4, id=uuid6, channel_values={}{"step": -2},None,[])# 从 checkpoint 恢复通道状态self.channels, self.managed = channels_from_checkpoint(self.specs, saved.checkpoint)# 恢复 pending_writes(断点续传的关键)self.checkpoint_pending_writes = [(str(tid), k, v) for tid, k, v in (saved.pending_writes or [])]
核心能力:get_tuple 是状态恢复的唯一入口,支持:
-
断点续传:恢复 pending_writes继续未完成的 Task -
历史回溯:通过 checkpoint_id指定历史版本 -
空状态初始化:新 thread 自动创建初始状态
list
唯一调用场景:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# main.py:1379 - 获取状态历史(时间旅行)def get_state_history(self, config, ..., limit=None):# 查询该 thread 的所有 checkpointfor checkpoint_tuple in list(checkpointer.list(config, before=before, limit=limit, filter=filter)):yield self._prepare_state_snapshot(checkpoint_tuple.config, checkpoint_tuple)
典型应用场景:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 1. 查看完整执行轨迹history = list(graph.get_state_history(config))for state in reversed(history): # 从旧到新print(f"Step {state.metadata['step']}: {state.metadata['source']}")# 2. 回滚到指定步骤(Undo 操作)history = list(graph.get_state_history(config, limit=2))if len(history) > 1:previous = history[1] # 上一个状态graph.update_state(previous.config, None, as_node="__copy__")# 3. 从中间状态 Fork 新分支checkpoint = history[5]new_thread = {"thread_id": "fork-1"}graph.update_state(checkpoint.config, {"strategy": "B"}, as_node="__copy__")
5.3 调用时机总览图
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linegraph.invoke(input, config)│▼┌─────────────────────────────────────────────────────────────┐│ get_tuple(config) ││ ├── 命中:恢复 checkpoint + pending_writes ││ └── 未命中:创建空 checkpoint │└─────────────────────────────────────────────────────────────┘│▼┌─────────────────────────────────────────────────────────────┐│ _first() 处理输入 ││ ├── map_input() → apply_writes() ││ └── ★ put({"source": "input"}) ← 首次全局快照 │└─────────────────────────────────────────────────────────────┘│▼Superstep Loop├── tick() 准备任务├── runner.tick() 执行任务│ ├── Task A 完成 → commit() → ★ put_writes(A) ← 任务级writes│ ├── Task B 完成 → commit() → ★ put_writes(B)│ └── Task C 完成 → commit() → ★ put_writes(C)└── after_tick()├── apply_writes() ← 全局同步└── ★ put({"source": "loop"}) ← Step 全局快照graph.get_state_history(config)└── ★ list(config) ← 查询历史(独立流程)
关键:put_writes 是高频、细粒度的实时记录,put 是低频、全局的状态快照。
六、PostgreSQL 实现剖析
了解了核心接口以及数据结构之后,无非就是基于对应存储载体的特点进行合理设计
6.1 表结构设计
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line-- 迁移记录表CREATE TABLE checkpoint_migrations (v INTEGER PRIMARY KEY);-- 主 checkpoint 表(元数据 + 非对象结构数据)CREATE TABLE checkpoints (thread_id TEXT NOT NULL,checkpoint_ns TEXT NOT NULL DEFAULT '',checkpoint_id TEXT NOT NULL,parent_checkpoint_id TEXT,type TEXT,checkpoint JSONB NOT NULL,metadata JSONB NOT NULL DEFAULT '{}',PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id));-- 二进制对象存储表CREATE TABLE checkpoint_blobs (thread_id TEXT NOT NULL,checkpoint_ns TEXT NOT NULL DEFAULT '',channel TEXT NOT NULL,version TEXT NOT NULL,type TEXT NOT NULL,blob BYTEA,PRIMARY KEY (thread_id, checkpoint_ns, channel, version));-- writes记录表CREATE TABLE checkpoint_writes (thread_id TEXT NOT NULL,checkpoint_ns TEXT NOT NULL DEFAULT '',checkpoint_id TEXT NOT NULL,task_id TEXT NOT NULL,idx INTEGER NOT NULL,channel TEXT NOT NULL,type TEXT,blob BYTEA NOT NULL,task_path TEXT NOT NULL DEFAULT '',PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx));
设计要点:
-
checkpoints:非二进制快照数据与元数据 -
checkpoint_blobs:序列化后的自定义对象(二进制存储) -
checkpoint_writes:单次快照过程中的所有操作记录
6.2 put 接口实现逻辑
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef put(self, config, checkpoint, metadata, new_versions):# 从 config 提取关键字段thread_id = config["configurable"]["thread_id"]checkpoint_ns = config["configurable"].get("checkpoint_ns", "")# 构造返回配置(包含新生成的 checkpoint_id)next_config = {"configurable": {"thread_id": thread_id,"checkpoint_ns": checkpoint_ns,"checkpoint_id": checkpoint["id"],}}# 1. 分离二进制数据(Blob)blob_values = {}for k, v in checkpoint["channel_values"].items():if v is None or isinstance(v, (str, int, float, bool)):continueblob_values[k] = checkpoint["channel_values"].pop(k)# 2. 如果存在blob_values,则写入 blobs 表(UPSERT)for k, v in blob_values.items():ver = new_versions[k]type_, blob = self.serde.dumps_typed(v)cur.execute(UPSERT_CHECKPOINT_BLOBS_SQL, (...))# 3. 写入 checkpoints 表(UPSERT)cur.execute(UPSERT_CHECKPOINTS_SQL, (...))return next_config
实现细节:
-
UPSERT 语义: ON CONFLICT ... DO UPDATE实现幂等写入 -
数据分离:JSON 元数据与二进制 Blob 分开存储,便于查询和压缩 -
版本控制: checkpoint_blobs以(thread_id, checkpoint_ns, channel, version)为联合主键,支持多版本共存
七、总结
LangGraph Checkpoint 通过精心设计的接口与灵活的扩展机制,为构建可靠的 LLM 应用提供了坚实的状态管理基础:
-
存储抽象:接口与实现分离,支持各种存储载体:内存/SQLite/PostgreSQL 等 -
双层持久化: put_writes(任务操作日志)+put(全局快照),平衡性能与一致性 -
版本追踪:Channel 版本机制实现依赖感知的增量更新 -
容错恢复:Pending Writes + Checkpoint 实现断点续传
无论是简单的对话应用,还是复杂的多 Agent 工作流,Checkpoint 都能提供强大的状态管理能力。
夜雨聆风