乐于分享
好东西不私藏

Langgraph框架源码解析系列——Checkpoint篇

Langgraph框架源码解析系列——Checkpoint篇

一、系列概述概述

LangGraph 由 LangChain 团队推出,是对早期 LangChain “链式”范式的根本性升级。它将 Agent 的执行流程建模为有向图(甚至带环图),天然支持循环、条件分支、状态回溯等复杂控制流,在状态管理的清晰度与人机协作(Human-in-the-loop)的支持完善度上表现突出,并可与 LangSmith、Langfuse 等 AI 可观测性工具深度集成。 在当前 Agent 框架竞争格局中,LangGraph 与 AutoGen、CrewAI、LlamaIndex Workflows 并列为最受关注的几个主流选择,但凭借更成熟的工程化设计和更深厚的生态整合,往往被认为在落地复杂业务场景时领先一档。 

这个系列主要基于作者目前工作中基于该框架的一些实践和学习进行输出,持续性不定期更新,剖析框架设计理念,并分享一些生产实践心得。


二、Checkpoint 核心概念

2.1 什么是 Checkpoint?

Checkpoint 是 LangGraph 的持久化层机制,在图执行的每个 superstep 保存状态快照。类比游戏存档功能:在关键节点保存进度,支持随时暂停、恢复或回溯。

核心价值:

  • 人机交互:支持人工介入和决策
  • 状态记忆:在多次交互间保持上下文
  • 故障恢复:从失败点继续执行
  • 时间旅行:回溯到任意历史状态

2.2 目录结构

LangGraph 采用 monorepo 架构管理核心库与Checkpoint实现,Checkpoint 相关模块按职责分层:

模块
版本
描述
checkpoint
4.0.1
检查点保存器的基础接口定义
checkpoint-postgres
3.0.4
PostgreSQL 检查点实现
checkpoint-sqlite
3.0.3
SQLite 检查点实现
checkpoint-conformance
验证自定义 checkpointer 是否符合规范
langgraph
1.0.10
核心框架,构建有状态、多角色 LLM 应用
prebuilt
1.0.8
高级 API,简化 Agent 创建

其中checkpoint目录结构如下:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linelibs/checkpoint/               # 基础接口包├── langgraph/checkpoint/│   ├── base/                  # BaseCheckpointSaver、核心数据结构│   ├── memory/                # InMemorySaver(内存实现)│   └── serde/                 # 序列化协议│       ├── base.py            # SerializerProtocol 接口│       ├── jsonplus.py        # JSON 扩展序列化│       └── encrypted.py       # 加密序列化├── checkpoint-postgres/       # PostgreSQL 持久化实现└── checkpoint-sqlite/         # SQLite 持久化实现

三、核心数据结构

3.1 CheckpointTuple

CheckpointTuple 是 NamedTuple 对象,记录单次运行快照的完整数据:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# libs/checkpoint/langgraph/checkpoint/base/__init__.py:112class CheckpointTuple(NamedTuple):    config: RunnableConfig              # LangChain 执行配置    checkpoint: Checkpoint              # 状态信息    metadata: CheckpointMetadata        # 元数据    parent_config: RunnableConfig | None = None  # 父级配置    pending_writes: list[PendingWrite] | None = None  # 待处理writes

3.2 Checkpoint

状态存储对象,数据存储在 channel(通道)中:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# libs/checkpoint/langgraph/checkpoint/base/__init__.py:65class Checkpoint(TypedDict):    v: int                              # 格式版本    idstr                             # 唯一且单调递增的 ID(UUID6)    ts: str                             # ISO 8601 时间戳    channel_values: dict[strAny]      # 通道值快照    channel_versions: ChannelVersions   # 各通道版本    versions_seen: dict[str, ChannelVersions]  # 各节点看到的版本    updated_channels: list[str] | None  # 本次更新的通道

3.3 CheckpointMetadata

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# libs/checkpoint/langgraph/checkpoint/base/__init__.py:35class CheckpointMetadata(TypedDict):    source: str      # "input" | "loop" | "update" | "fork"    stepint        # 步骤号    parents: dict    # 父 checkpoint ID 映射    run_id: str      # 运行 ID

四、关键机制详解

4.1 Thread:状态隔离机制

Thread 通过唯一标识实现多租户场景下的状态隔离:

字段
说明
必选
thread_id
线程唯一标识
checkpoint_ns
命名空间,多 Agent 模式下隔离空间
checkpoint_id
特定检查点标识,支持历史恢复
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 基础配置config_with_thread = {    "configurable": {        "thread_id""1"    }}# 完整配置config_with_checkpoint = {    "configurable": {        "thread_id""1",        "checkpoint_ns""",        "checkpoint_id""0c62ca34-ac19-445d-bbb0-5b4984975b2a"    }}

4.2 序列化协议(Serde)

JsonPlusSerializer 采用分层序列化策略:

数据类型
序列化方式
理由
None "null"

 + 空字节
快速路径
bytes/bytearray
原样存储
避免编码损耗
基础类型(int/float/str/bool)
内联存储
减少存储开销
复杂对象
msgpack(优先)/ pickle(回退)
性能与兼容性平衡

核心方法 dumps_typed 的实现逻辑:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef dumps_typed(self, obj: Any) -> tuple[strbytes]:    if obj is None:        return "null"b""    elif isinstance(obj, bytes):        return "bytes", obj    elif isinstance(obj, bytearray):        return "bytearray", obj    else:        try:            # 优先使用 msgpack,失败回退到 pickle            return "msgpack", ormsgpack.packb(obj)        except Exception:            if self.pickle_fallback:                return "pickle", pickle.dumps(obj)            raise

4.3 Pending Writes:容错恢复机制

Writes可以看作是一种带节点运行结果的操作日志,根据不同场景有不同的类型以及对应的值,Pending Writes 实现Task级别(也就是Graph里的节点)实时持久化

  • 触发时机:每个 Task 执行完成立即写入(通过 PregelRunner.commit 回调)
  • 作用:确保单个 Task 失败时,其他已完成 Task 的结果不丢失
  • 与 Checkpoint 关系put_writes 是细粒度写入,put 是全局快照

存储结构:包含当前节点执行的唯一id、channel名和对应值,一次任务基于channel可能会有多个Writes供写入

ounter(linePendingWrite = tuple[strstrAny]  # (task_id, channel_name, value)

Write 类型(源码定义于 _constants.py):

类型
触发场景
语义
ERROR
Task 异常
记录异常信息,终止后续执行
INTERRUPT
人机交互中断
暂停流程,等待人工输入
RESUME
中断恢复
携带恢复数据继续执行
PUSH Send

 指令
动态触发子图/节点
NO_WRITES
Task 无输出
占位标记,区分未执行与无输出
INPUT
初始输入
图启动时的初始状态

4.4 通道(Channel):状态载体

Channel 是 LangGraph中承载状态的基本单元。每个状态字段对应一个 Channel,它定义了如何存储如何更新以及如何持久化状态值。 Channel 是 Pregel 图计算模型的核心抽象,通过三字段协作机制实现增量计算、依赖追踪和并发控制:

ounter(lineounter(lineounter(lineounter(lineounter(line┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐│ channel_values  │     │ channel_versions │     │ versions_seen   ││   状态存储层     │ ←── │    版本控制层     │ ←── │   依赖感知层     ││  "数据是什么"   │     │  "数据第几版"     │     │  "节点看过哪版"  │└─────────────────┘     └──────────────────┘     └─────────────────┘

channel_values —— 状态存储层

下面是支持的Channel类型,不同类型对应到状态更新时有不同行为,通常我们自己定义的基础数据结构状态字段,如:int、str、float、dict等会被认为是LastValue,也就是在Graph的节点返回时覆盖当前state该字段值。

Channel 类型
核心功能
更新行为
典型使用场景
LastValue
存储单个值,每步只保留最后一个
仅允许单值写入,多写报错
普通状态字段(计数器、配置)
LastValueAfterFinish
存储单个值,finish() 后才可用
同 LastValue,但需 finish() 后才可读
Defer 节点,延迟可见
AnyValue
存储单个值,假设多写值相等
取最后一个值,允许多写
幂等更新场景
EphemeralValue
存储临时值,Step 后自动清空
空更新时清空值
临时中间结果
UntrackedValue
存储单个值,不持久化
取最后一个值
敏感信息、临时缓存
Topic
消息列表,支持累积/清空模式
展平追加消息列表
消息历史、事件流
BinaryOperatorAggregate
二元操作累积(如累加)
通过 operator 累积值,支持覆盖
累加器、计数器
NamedBarrierValue
同步屏障,等待指定名称集合
收集指定名称,齐全后才可用
Join 点、多入边等待
NamedBarrierValueAfterFinish
延迟屏障,finish() 后才可用
同 NamedBarrierValue
Defer 节点的 Join 点

channel_versions —— 版本控制层

Checkpoint中为每个Channel维护单调递增的版本号(默认整数自增),是 Pregel 依赖追踪的基础:

ounter(lineounter(lineounter(lineounter(line{    "messages": 3,   # 更新 3 次    "counter": 5     # 更新 5 次}

核心作用

  • 增量检测:对比 checkpoint_previous_versions 与 channel_versions,识别本次 Superstep(超步,pregel概念,理解为执行一批次的Task) 哪些通道发生变化
  • 全局排序checkpoint_id 与版本号共同构成全局时序,确保分布式环境下的状态一致性
  • 并发安全:版本号单调递增,天然具备乐观锁特性

versions_seen —— 依赖感知层

记录每个节点上次执行时看到的版本快照,是实现”仅当输入变化时才重新执行节点”的关键:

ounter(lineounter(lineounter(lineounter(line{    "node_A": {"messages": 2, "counter": 4},  # node_A 执行时看到的状态    "node_B": {"messages": 3, "counter": 4}   # node_B 执行时看到的状态}

核心作用

  • 节点调度prepare_next_tasks 对比 versions_seen[node] 与当前 channel_versions,若通道版本更新则触发节点执行
  • 幂等保证:即使节点被多次调度,只要输入版本未变,可安全跳过(配合缓存机制)
  • 细粒度依赖:支持节点仅订阅特定通道,未订阅通道的更新不触发重新执行

在 Pregel 循环中的协作

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineSuperstep N├── tick() 准备任务│   └── 对比 versions_seen[node] vs channel_versions│       └── 若 messages: 23,触发订阅该通道的节点├── runner.tick() 执行节点│   └── 节点读取 channel_values(版本已保证是最新的)└── after_tick() 提交    ├── apply_writes() 更新 channel_values    ├── get_next_version() 递增 channel_versions    └── 更新 versions_seen[executed_nodes] = 当前版本快照

这种设计让 LangGraph 在支持任意复杂状态依赖的同时,仍能高效地仅重算必要的节点,是实现”增量图计算”的基石。


五、接口设计

5.1 BaseCheckpointSaver

所有 Checkpoint 存储器必须实现以下接口:

方法
描述
必实现
put
存储 Checkpoint 和 pending writes
put_writes
存储与检查点关联的 pending writes
get_tuple
根据配置获取 CheckpointTuple
list
列出符合条件的 CheckpointTuple
delete_thread
删除 thread 关联的所有数据
get_next_version
生成通道新版本号(默认自增整数)

5.2 方法调用时机与源码剖析

理解 Checkpoint 各方法的调用时机,是掌握 LangGraph 状态流转的关键。以下结合源码,深入解析每个方法的触发场景。这里说的源码指的都是libs/langgraph/langgraph/pregel模块,可以说整个checkpoint架构都是为了pregel而设计,pregel作为整个langgraph的核心流程引擎,代码非常的复杂,后续会有单独的一篇做详细介绍。

put

调用场景一览:

触发时机
源码位置
Source 标记
实际作用
用户输入写入后 _loop.py:721-723 input
保存用户初始输入状态
Superstep 完成后 _loop.py:563 loop
保存每步执行后的完整状态
手动更新状态 main.py:1576-1862 update

/fork/input
update_state

 调用时
执行异常退出 _loop.py:830
继承当前
确保异常状态不丢失

源码级执行流程:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 以 _loop.py 中的 after_tick 为例def after_tick(self) -> None:    # 收集本 step 所有任务的writes    writes = [w for t in self.tasks.values() for w in t.writes]    # 记录checkpoint和writes(全局同步点)    self.updated_channels = apply_writes(        self.checkpoint, self.channels, self.tasks.values(), ...    )    # 清空 pending_writes,防止重复应用    self.checkpoint_pending_writes.clear()    # ★ 关键:保存全局 checkpoint    self._put_checkpoint({"source""loop"})

设计要点:put 是全局同步点,只在 Superstep 边界触发,确保状态一致性。频繁调用 put_writes 记录中间结果,但只在关键节点做全局快照。


put_writes

调用场景一览:

触发时机
源码位置
调用方式
关键作用
Task 执行完成 _runner.py:434-459
回调触发
实时保存任务输出
Command 输入处理 _loop.py:667
直接调用
处理人机交互指令
手动状态更新 main.py:1632-1877
直接调用
update_state

 内部写入

核心执行路径解析:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 1. Runner 层:任务完成后回调(高频触发点)def commit(self, task: PregelExecutableTask, exception):    if exception is None:        # 正常完成:立即写入任务结果        self.put_writes()(task.id, task.writes)    elif isinstance(exception, GraphInterrupt):        # 中断场景:保存中断信息供后续恢复        writes = [(INTERRUPT, exception.args[0])]        self.put_writes()(task.id, writes)    else:        # 异常场景:保存错误信息        task.writes.append((ERROR, exception))        self.put_writes()(task.id, task.writes)# 2. Loop 层:异步提交到持久层def put_writes(self, task_id: str, writes: WritesT) -> None:    # 内存缓存(供本 Step 后续任务读取)    self.checkpoint_pending_writes.extend((task_id, c, v) for c, v in writes)    # 异步持久化(不阻塞执行)    if self.durability != "exit" and self.checkpointer_put_writes:        self.submit(self.checkpointer_put_writes, config, writes_to_save, task_id)

关键设计:put_writes 是细粒度、高频的写入,每个 Task 完成即触发。它实现了:

  • 故障隔离:单个 Task 失败不影响其他 Task 结果
  • 实时可见:Task 输出立即可被后续 Task 读取
  • 性能优化:异步提交不阻塞主循环

get_tuple

调用场景:

触发时机
源码位置
恢复策略
图执行启动 _loop.py:1085/1264
加载 thread 最新状态或创建空状态
查询当前状态 main.py:1280/1324
返回 StateSnapshot 供用户查看

恢复逻辑源码:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# SyncPregelLoop.__enter__ 中的恢复逻辑def __enter__(self):    saved = self.checkpointer.get_tuple(self.checkpoint_config)    if saved is None:        # 首次执行:创建空 checkpoint        saved = CheckpointTuple(            self.checkpoint_config,            empty_checkpoint(),  # v=4, id=uuid6, channel_values={}            {"step": -2},            None,            []        )    # 从 checkpoint 恢复通道状态    self.channels, self.managed = channels_from_checkpoint(        self.specs, saved.checkpoint    )    # 恢复 pending_writes(断点续传的关键)    self.checkpoint_pending_writes = [        (str(tid), k, v) for tid, k, v in (saved.pending_writes or [])    ]

核心能力:get_tuple 是状态恢复的唯一入口,支持:

  • 断点续传:恢复 pending_writes 继续未完成的 Task
  • 历史回溯:通过 checkpoint_id 指定历史版本
  • 空状态初始化:新 thread 自动创建初始状态

list

唯一调用场景:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# main.py:1379 - 获取状态历史(时间旅行)def get_state_history(self, config, ..., limit=None):    # 查询该 thread 的所有 checkpoint    for checkpoint_tuple in list(        checkpointer.list(config, before=before, limit=limit, filter=filter)    ):        yield self._prepare_state_snapshot(checkpoint_tuple.config, checkpoint_tuple)

典型应用场景:

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line# 1. 查看完整执行轨迹history = list(graph.get_state_history(config))for state in reversed(history):  # 从旧到新    print(f"Step {state.metadata['step']}: {state.metadata['source']}")# 2. 回滚到指定步骤(Undo 操作)history = list(graph.get_state_history(config, limit=2))if len(history) > 1:    previous = history[1]  # 上一个状态    graph.update_state(previous.config, None, as_node="__copy__")# 3. 从中间状态 Fork 新分支checkpoint = history[5]new_thread = {"thread_id""fork-1"}graph.update_state(checkpoint.config, {"strategy""B"}, as_node="__copy__")

5.3 调用时机总览图

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linegraph.invoke(input, config)    │    ▼┌─────────────────────────────────────────────────────────────┐│  get_tuple(config)                                          ││  ├── 命中:恢复 checkpoint + pending_writes                 ││  └── 未命中:创建空 checkpoint                               │└─────────────────────────────────────────────────────────────┘    │    ▼┌─────────────────────────────────────────────────────────────┐│ _first() 处理输入                                           ││  ├── map_input() → apply_writes()                           ││  └── ★ put({"source": "input"})  ← 首次全局快照              │└─────────────────────────────────────────────────────────────┘    │    ▼Superstep Loop├── tick() 准备任务├── runner.tick() 执行任务│   ├── Task A 完成 → commit() → ★ put_writes(A)  ← 任务级writes│   ├── Task B 完成 → commit() → ★ put_writes(B)│   └── Task C 完成 → commit() → ★ put_writes(C)└── after_tick()    ├── apply_writes()  ← 全局同步    └── ★ put({"source": "loop"})  ← Step 全局快照graph.get_state_history(config)    └── ★ list(config)  ← 查询历史(独立流程)

关键:put_writes 是高频、细粒度的实时记录,put 是低频、全局的状态快照。


六、PostgreSQL 实现剖析

了解了核心接口以及数据结构之后,无非就是基于对应存储载体的特点进行合理设计

6.1 表结构设计

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line-- 迁移记录表CREATE TABLE checkpoint_migrations (    v INTEGER PRIMARY KEY);-- 主 checkpoint 表(元数据 + 非对象结构数据)CREATE TABLE checkpoints (    thread_id TEXT NOT NULL,    checkpoint_ns TEXT NOT NULL DEFAULT '',    checkpoint_id TEXT NOT NULL,    parent_checkpoint_id TEXT,    type TEXT,    checkpoint JSONB NOT NULL,    metadata JSONB NOT NULL DEFAULT '{}',    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id));-- 二进制对象存储表CREATE TABLE checkpoint_blobs (    thread_id TEXT NOT NULL,    checkpoint_ns TEXT NOT NULL DEFAULT '',    channel TEXT NOT NULL,    version TEXT NOT NULL,    type TEXT NOT NULL,    blob BYTEA,    PRIMARY KEY (thread_id, checkpoint_ns, channel, version));-- writes记录表CREATE TABLE checkpoint_writes (    thread_id TEXT NOT NULL,    checkpoint_ns TEXT NOT NULL DEFAULT '',    checkpoint_id TEXT NOT NULL,    task_id TEXT NOT NULL,    idx INTEGER NOT NULL,    channel TEXT NOT NULL,    type TEXT,    blob BYTEA NOT NULL,    task_path TEXT NOT NULL DEFAULT '',    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx));

设计要点:

  • checkpoints:非二进制快照数据与元数据
  • checkpoint_blobs:序列化后的自定义对象(二进制存储)
  • checkpoint_writes:单次快照过程中的所有操作记录

6.2 put 接口实现逻辑

ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linedef put(self, config, checkpoint, metadata, new_versions):    # 从 config 提取关键字段    thread_id = config["configurable"]["thread_id"]    checkpoint_ns = config["configurable"].get("checkpoint_ns""")    # 构造返回配置(包含新生成的 checkpoint_id)    next_config = {        "configurable": {            "thread_id": thread_id,            "checkpoint_ns": checkpoint_ns,            "checkpoint_id": checkpoint["id"],        }    }    # 1. 分离二进制数据(Blob)    blob_values = {}    for k, v in checkpoint["channel_values"].items():        if v is None or isinstance(v, (strintfloatbool)):            continue        blob_values[k] = checkpoint["channel_values"].pop(k)    # 2. 如果存在blob_values,则写入 blobs 表(UPSERT)    for k, v in blob_values.items():        ver = new_versions[k]        type_, blob = self.serde.dumps_typed(v)        cur.execute(UPSERT_CHECKPOINT_BLOBS_SQL, (...))    # 3. 写入 checkpoints 表(UPSERT)    cur.execute(UPSERT_CHECKPOINTS_SQL, (...))    return next_config

实现细节:

  • UPSERT 语义ON CONFLICT ... DO UPDATE 实现幂等写入
  • 数据分离:JSON 元数据与二进制 Blob 分开存储,便于查询和压缩
  • 版本控制checkpoint_blobs 以 (thread_id, checkpoint_ns, channel, version) 为联合主键,支持多版本共存

七、总结

LangGraph Checkpoint 通过精心设计的接口与灵活的扩展机制,为构建可靠的 LLM 应用提供了坚实的状态管理基础:

  1. 存储抽象:接口与实现分离,支持各种存储载体:内存/SQLite/PostgreSQL 等
  2. 双层持久化put_writes(任务操作日志)+ put(全局快照),平衡性能与一致性
  3. 版本追踪:Channel 版本机制实现依赖感知的增量更新
  4. 容错恢复:Pending Writes + Checkpoint 实现断点续传

无论是简单的对话应用,还是复杂的多 Agent 工作流,Checkpoint 都能提供强大的状态管理能力。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » Langgraph框架源码解析系列——Checkpoint篇

猜你喜欢

  • 暂无文章