
前言
上一篇我们搭了一个能"自主使用工具"的 Agent。但如果你真正跑起来就会发现三个问题:
- 失忆——每次对话都是新的,Agent 完全不记得上一轮你说了什么
- 失控——Agent 决定调什么工具、你完全无法干预,万一它准备删库呢?
- 发呆——Agent 在"思考"时,用户只能干等一个最终结果
本文解决这三个问题:持久化(Persistence)让 Agent 拥有记忆、人机协作(Human-in-the-Loop)让关键操作必须人类审批、流式输出(Streaming)让用户实时看到思考过程。
其中持久化是核心——后面两个能力都依赖它。
一、持久化:给 Agent 装上记忆
1.1 问题复现
用第一篇的代码跑两轮对话:
# 第一轮
result = app.invoke({"messages": [HumanMessage(content="我在北京")]})
# -> "好的,你在北京。有什么我可以帮你的?"
# 第二轮
result = app.invoke({"messages": [HumanMessage(content="这里天气怎么样?")]})
# -> "请问你在哪个城市?" <- 忘了!
为什么?因为每次 invoke 都是全新的 State,上一轮的消息完全丢失了。
1.2 Checkpointer:一行代码加记忆
LangGraph 的解决方案叫 Checkpointer——在每个节点执行完后,自动把 State 存一份快照。
from langgraph.checkpoint.memory import InMemorySaver
# 创建检查点存储器
checkpointer = InMemorySaver()
# 编译时传入
app = workflow.compile(checkpointer=checkpointer)
就这一行。但调用时需要多传一个 thread_id:
config = {"configurable": {"thread_id": "user-001"}}
# 第一轮
result = app.invoke(
{"messages": [HumanMessage(content="我在北京")]},
config=config
)
# -> "好的,你在北京。"
# 第二轮(同一个 thread_id)
result = app.invoke(
{"messages": [HumanMessage(content="这里天气怎么样?")]},
config=config
)
# -> "北京今天晴天,32 C" <- 记住了!
thread_id = 对话线程的唯一标识。同一个 thread_id 下的消息会自动累积,就像微信聊天记录一样。
1.3 发生了什么?
加了 Checkpointer 后,每次节点执行完,LangGraph 会自动存快照:
执行 agent 节点 -> 存快照(checkpoint_1)
执行 tools 节点 -> 存快照(checkpoint_2)
执行 agent 节点 -> 存快照(checkpoint_3)
...
下次同一个 thread_id 再调用时,会先加载最新的快照,然后在此基础上继续。
Thread "user-001" 的时间线:
第一轮对话:
checkpoint_0 (空) -> checkpoint_1 (agent推理) -> checkpoint_2 (最终回答)
第二轮对话:
从 checkpoint_2 继续 -> checkpoint_3 -> checkpoint_4 -> checkpoint_5
所有历史消息都存在 State 里,Agent 自然就"记住"了之前的对话。
1.4 三种 Checkpointer
| Checkpointer | 适用场景 | 特点 |
|---|
| InMemorySaver | 开发调试 | 存在内存,进程结束就没了 |
| SqliteSaver | 本地应用 | 存在 SQLite 文件,重启不丢 |
| PostgresSaver | 生产环境 | 存在 PostgreSQL,支持高并发 |
生产代码示例:
from langgraph.checkpoint.sqlite import SqliteSaver
# 数据存到文件,重启应用也不丢
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
# 即使程序重启,user-001 的对话历史还在
1.5 State 操作三件套
Checkpointer 不只是"自动存档",它还提供了三个强大的 API:
get_state:查看当前状态
state = app.get_state(config)
print(state.values["messages"]) # 当前所有消息
print(state.next) # 下一步要执行的节点(空=已完成)
get_state_history:时间旅行
# 获取所有历史快照
for snapshot in app.get_state_history(config):
print(f"Step {snapshot.metadata['step']}: {snapshot.next}")
print(f" 消息数: {len(snapshot.values['messages'])}")
print(f" 时间: {snapshot.created_at}")
这就像 Git 的 git log——你能看到 Agent 每一步的状态变化,方便调试。
update_state:手动修改状态
# 假设 Agent 理解错了,你想手动纠正
app.update_state(
config,
values={"messages": [HumanMessage(content="我改主意了,查深圳的天气")]},
as_node="agent" # 假装这条消息是从 agent 节点产生的
)
# 继续执行(从修改后的状态继续)
result = app.invoke(None, config)
update_state 的典型场景:手动纠正 Agent 的错误理解、注入系统指令、调试时跳过某些步骤。
1.6 Memory Store:跨对话的长期记忆
Checkpointer 解决的是同一对话内的记忆(短期记忆)。但如果用户今天聊完关了,明天开新对话又得重新自我介绍?
这时需要 Memory Store——跨对话的长期记忆。
from langgraph.store.memory import InMemoryStore
# 创建 Store
store = InMemoryStore()
# 编译时同时传入 checkpointer 和 store
app = workflow.compile(checkpointer=checkpointer, store=store)
Store 的数据结构是 namespace + key -> value:
# 存储用户偏好(跨对话共享)
store.put(
namespace=("user", "user-001", "preferences"),
key="language",
value={"setting": "中文", "updated": "2024-06-20"}
)
# 在任何对话中检索
items = store.search(
namespace_prefix=("user", "user-001"),
query="用户住在哪里", # 支持语义搜索!
limit=3
)
1.7 Checkpointer vs Store 总结
| 维度 | Checkpointer | Store |
|---|
| 比喻 | 聊天记录 | 通讯录备注 |
| 作用域 | 单个 thread(对话) | 跨所有 thread |
| 存什么 | 完整对话状态快照 | 结构化知识片段 |
| 谁写入 | 自动(每步存档) | 手动(代码里 put) |
| 典型用途 | 多轮对话、断点恢复 | 用户画像、偏好、长期事实 |

Agent 记忆体系
------------------------------------------------
Checkpointer(短期记忆)
[Thread A] [Thread B] [Thread C]
对话1记录 对话2记录 对话3记录
Store(长期记忆)
[用户画像 | 偏好设置 | 历史摘要]
所有对话共享
------------------------------------------------
1.8 三类记忆模式
设计 Agent 记忆时,可以参考认知科学的三类记忆:
| 记忆类型 | 含义 | Agent 中的实现 |
|---|
| 语义记忆 | 关于用户/世界的事实 | Store 中存用户画像、偏好 |
| 情景记忆 | 过去做过什么 | Store 中存历史对话摘要 |
| 程序记忆 | 该怎么做事 | Store 中存规则、SOP |
一个"聪明"的 Agent 会同时运用三类记忆:知道用户叫阿太住深圳(语义)、记得上次帮用户查过酒店选了希尔顿(情景)、知道查酒店时要先问日期和预算(程序)。
二、Human-in-the-Loop:让人类掌握决定权
2.1 为什么需要人机协作
想象你给 Agent 配了一个"发邮件"工具。用户说"帮我给老板发封请假邮件",Agent 自动写好内容、填上老板邮箱、直接发了——万一内容写得不对呢?发出去可就收不回来了。
这就需要 Human-in-the-Loop:在关键操作前暂停,等人类确认后再继续。
2.2 interrupt:动态断点
LangGraph 提供了 interrupt() 函数,可以在任何节点中暂停执行:
from langgraph.types import interrupt, Command
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件给指定收件人"""
# 发送前暂停,等人类确认
approval = interrupt({
"question": "确认发送以下邮件?",
"to": to,
"subject": subject,
"body": body
})
if approval == "yes":
smtp.send(to, subject, body)
return "邮件已发送"
else:
return "已取消发送"
2.3 完整流程
config = {"configurable": {"thread_id": "email-task-001"}}
# 第一次调用:Agent 决定调 send_email -> 触发 interrupt -> 暂停
result = app.invoke(
{"messages": [HumanMessage(content="帮我给老板发封请假邮件")]},
config=config
)
# 查看 Agent 想做什么
state = app.get_state(config)
print(state.tasks[0].interrupts[0].value)
# -> {"question": "确认发送以下邮件?", "to": "boss@..."}
# 人类审核后,决定批准
result = app.invoke(
Command(resume="yes"), # 把答案传回去
config=config
)
# -> "邮件已发送"
整个过程可视化:
用户: "帮我发封请假邮件"
|
v
[Agent] -> LLM 决定调 send_email
|
v
[Tools] -> send_email 内部触发 interrupt()
|
v
PAUSE! 等待人类确认...(状态已存档)
人类: "yes"
|
v
[Tools] -> 继续执行,真正发邮件
|
v
[Agent] -> "邮件已发送"
|
v
END

2.4 关键点
- interrupt 依赖 Checkpointer——暂停时状态必须存档,否则恢复不了
- 暂停期间可以关机——状态存在数据库里,明天回来继续都行
- 人类可以修改——不只是 yes/no,可以返回修改后的内容
2.5 静态断点(简单场景)
如果你只想在某个节点前固定暂停,不需要写 interrupt:
# 在 tools 节点执行前暂停(所有工具调用都要确认)
app = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["tools"]
)
适合开发调试时使用。
三、Streaming:实时看到 Agent 的思考
3.1 为什么需要流式
Agent 做复杂任务时可能需要几十秒。如果只能干等最终结果,用户体验很差。流式输出让你实时看到每一步的进展。
3.2 三种流式模式
# 模式一:values -- 每步输出完整状态
for chunk in app.stream(input, config, stream_mode="values"):
print(chunk["messages"][-1].content)
# 模式二:updates -- 只输出每步的增量变化
for chunk in app.stream(input, config, stream_mode="updates"):
for node_name, update in chunk.items():
print(f"[{node_name}] 产生了新消息")
# 模式三:messages -- Token 级流式(逐字输出)
for msg, metadata in app.stream(input, config, stream_mode="messages"):
if msg.content:
print(msg.content, end="", flush=True)
3.3 三种模式对比
| 模式 | 粒度 | 适用场景 |
|---|
| values | 每个节点完成后 | 调试、看完整状态变化 |
| updates | 每个节点的增量 | 后端日志、进度追踪 |
| messages | 每个 Token | 前端展示打字效果 |
3.4 实用模式:进度通知
for chunk in app.stream(input, config, stream_mode="updates"):
if "agent" in chunk:
msg = chunk["agent"]["messages"][-1]
if msg.tool_calls:
print(f"正在调用: {msg.tool_calls[0]['name']}...")
elif msg.content:
print(f"{msg.content}")
elif "tools" in chunk:
print("工具执行完毕")
输出效果:
正在调用: get_weather...
工具执行完毕
北京今天晴天,32 C,适合户外活动。
四、总结
| 概念 | 一句话解释 | 关键 API |
|---|
| Checkpointer | 自动存档对话状态,实现多轮记忆 | InMemorySaver / SqliteSaver |
| thread_id | 对话线程标识,同 ID = 同一对话 | config["configurable"]["thread_id"] |
| get_state | 查看当前状态快照 | app.get_state(config) |
| get_state_history | 查看所有历史快照 | app.get_state_history(config) |
| update_state | 手动修改状态 | app.update_state(config, values) |
| Store | 跨对话长期记忆 | InMemoryStore + store.put/search |
| interrupt | 暂停执行,等待人类输入 | from langgraph.types import interrupt |
| Command | 恢复被暂停的执行 | Command(resume=...) |
| stream_mode | 流式输出模式 | "values" / "updates" / "messages" |
从 Demo 到生产的路径
| 第一篇 ✅ | 本篇 ✅ | 下一篇 → |
|---|
| Agent 能跑 | 记得住、可控、实时 | 多 Agent 协作 |
| StateGraph | Checkpointer + Store | SubGraph |
| bind_tools | interrupt + Command | 多 Agent 通信 |
| ReAct Loop | Streaming | Supervisor 模式 |
参考资料
- LangGraph Persistence 概念文档
- LangGraph Memory 概念文档
- LangGraph Human-in-the-Loop 文档
- LangGraph Streaming How-to
- LangGraph Persistence Tutorial
- LangGraph Human-in-the-Loop Tutorial