💾 Checkpointer 持久化系统
掌握 LangChain 1.0 的 Checkpointer 机制,实现对话历史持久化和短期记忆管理, 构建有状态的 AI Agent 应用。
📚 什么是 Checkpointer?
Checkpointer 是 LangChain 1.0 的状态持久化机制, 让 Agent 能够记住对话历史,实现真正的短期记忆(Short-term Memory)。
- Thread(线程):一个独立的对话会话,类似邮件的会话视图
- Checkpoint(检查点):Agent 状态的快照,包含完整的消息历史
- Thread ID:唯一标识符,用于区分不同的对话会话
- State(状态):包含 messages、自定义字段等的完整上下文
通过 Checkpointer,Agent 可以访问完整的对话历史,同时保持不同 Thread 之间的隔离。
为什么需要 Checkpointer?
长对话面临以下挑战:
- 上下文窗口限制:完整历史可能超出 LLM 的 token 限制
- 会话隔离:需要区分不同用户或会话的对话
- 状态丢失:Agent 重启后需要恢复对话上下文
- 性能问题:每次请求重新加载所有历史效率低
🏗️ Checkpointer 架构
执行流程说明
| 步骤 | 操作 | 说明 |
|---|---|---|
| 1. 请求到达 | 检查 thread_id | 判断是否为现有对话 |
| 2. 加载状态 | 从 Checkpointer 读取 | 恢复完整的消息历史和自定义状态 |
| 3. Agent 处理 | 使用历史上下文 | 模型可以访问之前的对话内容 |
| 4. 保存状态 | 写入 Checkpointer | 持久化新的消息和状态变化 |
🗂️ Checkpointer 类型对比
| 类型 | 存储位置 | 持久性 | 适用场景 | 安装 |
|---|---|---|---|---|
| InMemorySaver | 应用内存 | ❌ 重启丢失 | 开发测试 | 内置 |
| SqliteSaver | SQLite 文件 | ✅ 文件持久化 | 小型部署、单机应用 | langgraph-checkpoint-sqlite |
| PostgresSaver | PostgreSQL 数据库 | ✅ 数据库持久化 | 生产环境、多实例 | langgraph-checkpoint-postgres |
1. InMemorySaver - 开发测试
"""
InMemorySaver 示例
功能:内存存储,适合开发测试
"""
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
# 创建带内存持久化的 Agent
agent = create_agent(
model="gpt-4o",
tools=[],
checkpointer=InMemorySaver(), # 内存存储
)
# 第一轮对话
response1 = agent.invoke(
{"messages": [{"role": "user", "content": "你好!我叫张三。"}]},
{"configurable": {"thread_id": "user_001"}} # 指定 Thread ID
)
print(response1["messages"][-1].content)
# 第二轮对话(记住之前的内容)
response2 = agent.invoke(
{"messages": [{"role": "user", "content": "我刚才说我叫什么?"}]},
{"configurable": {"thread_id": "user_001"}} # 相同 Thread ID
)
print(response2["messages"][-1].content) # 输出:"你说你叫张三。"
InMemorySaver 仅将状态存储在应用内存中。一旦应用重启,所有对话历史都会丢失。 仅用于开发和测试环境,生产环境请使用数据库存储。
2. PostgresSaver - 生产环境
# 安装 PostgreSQL Checkpointer
pip install langgraph-checkpoint-postgres
"""
PostgresSaver 示例
功能:生产级数据库持久化
"""
from langchain.agents import create_agent
from langgraph.checkpoint.postgres import PostgresSaver
# PostgreSQL 连接字符串
DB_URI = "postgresql://username:password@localhost:5432/langchain_db"
# 使用上下文管理器
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
# 首次使用需要初始化表结构
checkpointer.setup()
# 创建 Agent
agent = create_agent(
model="gpt-4o",
tools=[],
checkpointer=checkpointer,
)
# 用户 A 的对话
agent.invoke(
{"messages": [{"role": "user", "content": "我喜欢蓝色"}]},
{"configurable": {"thread_id": "user_A"}}
)
# 用户 B 的对话(完全隔离)
agent.invoke(
{"messages": [{"role": "user", "content": "我喜欢红色"}]},
{"configurable": {"thread_id": "user_B"}}
)
# 用户 A 继续对话(记住之前的内容)
response = agent.invoke(
{"messages": [{"role": "user", "content": "我喜欢什么颜色?"}]},
{"configurable": {"thread_id": "user_A"}}
)
print(response["messages"][-1].content) # 输出:"你喜欢蓝色。"
3. SqliteSaver - 轻量部署
# 安装 SQLite Checkpointer
pip install langgraph-checkpoint-sqlite
"""
SqliteSaver 示例
功能:文件持久化,适合小型部署
"""
from langchain.agents import create_agent
from langgraph.checkpoint.sqlite import SqliteSaver
# 创建 SQLite Checkpointer(存储到文件)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
checkpointer.setup()
agent = create_agent(
model="gpt-4o",
tools=[],
checkpointer=checkpointer,
)
# 对话会持久化到 checkpoints.db 文件
agent.invoke(
{"messages": [{"role": "user", "content": "你好"}]},
{"configurable": {"thread_id": "session_123"}}
)
🔑 Thread ID 和会话隔离
thread_id 是 Checkpointer 的核心概念,用于区分不同的对话会话。
Thread ID 使用模式
"""
Thread ID 使用示例
功能:演示会话隔离和 Thread ID 设计模式
"""
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
import uuid
agent = create_agent("gpt-4o", tools=[], checkpointer=InMemorySaver())
# 模式 1:用户 ID 作为 Thread ID(单用户多会话)
user_id = "user_12345"
session_id = "session_001"
thread_id = f"{user_id}_{session_id}"
agent.invoke(
{"messages": [{"role": "user", "content": "帮我创建订单"}]},
{"configurable": {"thread_id": thread_id}}
)
# 模式 2:随机生成 Thread ID(匿名会话)
thread_id = str(uuid.uuid4()) # 生成唯一 ID
agent.invoke(
{"messages": [{"role": "user", "content": "你好"}]},
{"configurable": {"thread_id": thread_id}}
)
# 模式 3:业务实体作为 Thread ID(订单对话)
order_id = "ORDER-2024-001"
thread_id = f"order_{order_id}"
agent.invoke(
{"messages": [{"role": "user", "content": "查询订单状态"}]},
{"configurable": {"thread_id": thread_id}}
)
# 模式 4:多租户隔离
tenant_id = "company_A"
user_id = "user_001"
thread_id = f"{tenant_id}_{user_id}"
agent.invoke(
{"messages": [{"role": "user", "content": "帮我查询账户余额"}]},
{"configurable": {"thread_id": thread_id}}
)
- 唯一性:确保每个 Thread ID 全局唯一
- 可读性:使用有意义的命名,方便调试和追踪
- 分层结构:使用分隔符组合多个标识符(如
tenant_user_session) - 安全性:避免在 Thread ID 中包含敏感信息
- 长度限制:保持合理长度(通常 <128 字符)
📐 自定义状态 Schema
除了消息历史,还可以在状态中存储自定义数据。
"""
自定义状态 Schema 示例
功能:在状态中存储用户信息和偏好设置
"""
from langchain.agents import create_agent, AgentState
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool, ToolRuntime
# 定义自定义状态类型
class CustomAgentState(AgentState):
user_id: str
user_name: str
preferences: dict
order_history: list
# 定义访问状态的工具
@tool
def get_user_info(runtime: ToolRuntime[CustomAgentState]) -> str:
"""获取用户信息"""
state = runtime.state
user_name = state.get("user_name", "未知用户")
preferences = state.get("preferences", {})
return f"""
用户姓名:{user_name}
主题偏好:{preferences.get('theme', '未设置')}
语言偏好:{preferences.get('language', '未设置')}
"""
@tool
def get_order_history(runtime: ToolRuntime[CustomAgentState]) -> str:
"""查询订单历史"""
orders = runtime.state.get("order_history", [])
if not orders:
return "暂无订单记录"
order_list = "\n".join([f"- {order}" for order in orders])
return f"订单历史:\n{order_list}"
# 创建带自定义状态的 Agent
agent = create_agent(
model="gpt-4o",
tools=[get_user_info, get_order_history],
state_schema=CustomAgentState,
checkpointer=InMemorySaver(),
)
# 调用时提供完整状态
result = agent.invoke(
{
"messages": [{"role": "user", "content": "查询我的信息"}],
"user_id": "user_123",
"user_name": "张三",
"preferences": {
"theme": "dark",
"language": "zh-CN"
},
"order_history": [
"订单#001 - iPhone 15",
"订单#002 - MacBook Pro"
]
},
{"configurable": {"thread_id": "user_123"}}
)
print(result["messages"][-1].content)
自定义状态字段会被持久化到 Checkpointer 中,下次调用时自动恢复。 适合存储:
- 用户身份信息
- 会话配置和偏好
- 业务上下文数据
- 临时计算结果
注意:避免在状态中存储大型对象(如完整文档), 应该存储引用 ID,按需从数据库加载。
✂️ 消息管理策略
长对话会超出 LLM 的上下文窗口限制,需要实施消息管理策略。
1. 消息修剪(Trim Messages)
"""
消息修剪中间件
功能:保留最近的 N 条消息,删除旧消息
"""
from langchain.agents.middleware import before_model
from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langchain.agents import AgentState, create_agent
from langgraph.checkpoint.memory import InMemorySaver
@before_model
def trim_messages(state: AgentState, runtime) -> dict | None:
"""修剪消息:保留第一条 + 最近 3 条"""
messages = state["messages"]
# 消息数量少于阈值,无需修剪
if len(messages) <= 3:
return None
# 保留第一条消息(通常是 SystemMessage)
first_msg = messages[0]
# 保留最近 3 条消息
recent = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
# 构建修剪指令
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES), # 删除所有消息
first_msg, # 恢复第一条
*recent # 恢复最近的消息
]
}
# 创建带消息修剪的 Agent
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[trim_messages],
checkpointer=InMemorySaver(),
)
# 模拟长对话
thread_id = "long_conversation"
for i in range(10):
agent.invoke(
{"messages": [{"role": "user", "content": f"消息 {i+1}"}]},
{"configurable": {"thread_id": thread_id}}
)
# 历史只会保留最近几条消息
2. 消息删除(Delete Messages)
"""
消息删除示例
功能:手动删除特定消息
"""
from langchain.messages import RemoveMessage
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent("gpt-4o", tools=[], checkpointer=InMemorySaver())
# 第一轮对话
response1 = agent.invoke(
{"messages": [{"role": "user", "content": "第一条消息"}]},
{"configurable": {"thread_id": "test"}}
)
# 第二轮对话
response2 = agent.invoke(
{"messages": [{"role": "user", "content": "第二条消息"}]},
{"configurable": {"thread_id": "test"}}
)
# 获取当前状态
state = agent.get_state({"configurable": {"thread_id": "test"}})
messages = state.values["messages"]
# 删除前两条消息
if len(messages) > 2:
delete_result = agent.update_state(
{"configurable": {"thread_id": "test"}},
{"messages": [RemoveMessage(id=m.id) for m in messages[:2]]}
)
# 继续对话(前两条消息已被删除)
response3 = agent.invoke(
{"messages": [{"role": "user", "content": "我之前说了什么?"}]},
{"configurable": {"thread_id": "test"}}
)
print(response3["messages"][-1].content) # 模型不会记得前两条
3. 消息摘要(Summarization)
"""
消息摘要中间件
功能:自动总结旧消息,节省 Token
"""
from langchain.agents.middleware import SummarizationMiddleware
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[
SummarizationMiddleware(
model="gpt-4o-mini", # 使用轻量模型生成摘要
trigger=("tokens", 4000), # 超过 4000 tokens 触发摘要
keep=("messages", 20), # 保留最近 20 条消息
)
],
checkpointer=InMemorySaver(),
system_prompt="你是一个有用的助手。"
)
# 长对话会自动触发摘要
thread_id = "summarization_test"
for i in range(30):
agent.invoke(
{"messages": [{"role": "user", "content": f"这是第 {i+1} 条消息,内容是..."}]},
{"configurable": {"thread_id": thread_id}}
)
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 修剪 | 简单、快速 | 丢失历史信息 | 短期对话、不重要的历史 |
| 摘要 | 保留关键信息 | 额外 LLM 调用成本 | 长期对话、需要上下文 |
| 混合 | 灵活、可定制 | 实现复杂 | 企业级应用 |
🔧 在工具中访问和修改状态
读取状态
"""
工具读取状态示例
功能:在工具中访问 Agent 状态
"""
from langchain.tools import tool, ToolRuntime
from langchain.agents import AgentState, create_agent
from langgraph.checkpoint.memory import InMemorySaver
class UserState(AgentState):
user_id: str
subscription_level: str
@tool
def get_user_quota(runtime: ToolRuntime[UserState]) -> str:
"""查询用户配额"""
# 访问状态
user_id = runtime.state.get("user_id", "unknown")
level = runtime.state.get("subscription_level", "free")
# 根据订阅等级返回配额
quotas = {
"free": 10,
"basic": 100,
"premium": 1000,
}
quota = quotas.get(level, 10)
return f"用户 {user_id} 的配额:{quota} 次/月"
agent = create_agent(
model="gpt-4o",
tools=[get_user_quota],
state_schema=UserState,
checkpointer=InMemorySaver(),
)
result = agent.invoke(
{
"messages": [{"role": "user", "content": "查询我的配额"}],
"user_id": "user_123",
"subscription_level": "premium"
},
{"configurable": {"thread_id": "user_123"}}
)
print(result["messages"][-1].content)
修改状态
"""
工具修改状态示例
功能:工具执行后更新状态
"""
from langchain.tools import tool, ToolRuntime
from langgraph.types import Command
from langchain.messages import ToolMessage
from langchain.agents import AgentState, create_agent
from langgraph.checkpoint.memory import InMemorySaver
class OrderState(AgentState):
current_order_id: str | None
order_total: float
@tool
def create_order(
items: list[str],
runtime: ToolRuntime[OrderState]
) -> Command:
"""创建订单并更新状态"""
# 模拟订单创建
order_id = "ORDER-2024-001"
total = len(items) * 99.0 # 简化计算
# 返回 Command 更新状态
return Command(
update={
"current_order_id": order_id,
"order_total": total,
"messages": [
ToolMessage(
content=f"订单创建成功!订单号:{order_id},总金额:¥{total}",
tool_call_id=runtime.tool_call_id
)
]
}
)
@tool
def get_current_order(runtime: ToolRuntime[OrderState]) -> str:
"""查询当前订单"""
order_id = runtime.state.get("current_order_id")
total = runtime.state.get("order_total", 0.0)
if not order_id:
return "暂无进行中的订单"
return f"当前订单:{order_id},金额:¥{total}"
agent = create_agent(
model="gpt-4o",
tools=[create_order, get_current_order],
state_schema=OrderState,
checkpointer=InMemorySaver(),
)
# 创建订单
result1 = agent.invoke(
{
"messages": [{"role": "user", "content": "帮我创建订单,商品:iPhone, AirPods"}],
"current_order_id": None,
"order_total": 0.0
},
{"configurable": {"thread_id": "order_session_001"}}
)
# 查询订单(状态已更新)
result2 = agent.invoke(
{"messages": [{"role": "user", "content": "查询我的订单"}]},
{"configurable": {"thread_id": "order_session_001"}}
)
print(result2["messages"][-1].content)
🎯 完整示例:多用户客服系统
"""
多用户客服系统完整示例
功能:演示 Checkpointer 在实际应用中的使用
"""
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
from langchain.messages import ToolMessage
from datetime import datetime
# ==================== 自定义状态 ====================
class CustomerServiceState(AgentState):
user_id: str
user_name: str
issue_category: str | None
ticket_id: str | None
satisfaction_score: int | None
# ==================== 工具定义 ====================
@tool
def create_ticket(
category: str,
description: str,
runtime: ToolRuntime[CustomerServiceState]
) -> Command:
"""创建客服工单"""
user_id = runtime.state.get("user_id", "unknown")
ticket_id = f"TICKET-{datetime.now().strftime('%Y%m%d%H%M%S')}"
return Command(
update={
"ticket_id": ticket_id,
"issue_category": category,
"messages": [
ToolMessage(
content=f"工单创建成功!\n工单号:{ticket_id}\n类别:{category}\n问题:{description}",
tool_call_id=runtime.tool_call_id
)
]
}
)
@tool
def get_ticket_status(runtime: ToolRuntime[CustomerServiceState]) -> str:
"""查询工单状态"""
ticket_id = runtime.state.get("ticket_id")
if not ticket_id:
return "您当前没有进行中的工单"
category = runtime.state.get("issue_category", "未分类")
return f"工单号:{ticket_id}\n类别:{category}\n状态:处理中\n预计解决时间:24 小时内"
@tool
def submit_feedback(
score: int,
comment: str,
runtime: ToolRuntime[CustomerServiceState]
) -> Command:
"""提交满意度评分(1-5 分)"""
if not 1 <= score <= 5:
return Command(
update={
"messages": [
ToolMessage(
content="评分必须在 1-5 分之间",
tool_call_id=runtime.tool_call_id
)
]
}
)
return Command(
update={
"satisfaction_score": score,
"messages": [
ToolMessage(
content=f"感谢您的反馈!评分:{score} 分\n评价:{comment}",
tool_call_id=runtime.tool_call_id
)
]
}
)
# ==================== 创建 Agent ====================
customer_service_agent = create_agent(
model="gpt-4o",
tools=[create_ticket, get_ticket_status, submit_feedback],
state_schema=CustomerServiceState,
checkpointer=InMemorySaver(), # 生产环境使用 PostgresSaver
system_prompt="""你是一个专业的客服助手。
职责:
1. 帮助用户创建和查询工单
2. 收集用户反馈和满意度评分
3. 提供友好、专业的服务
工作流程:
- 了解用户问题,使用 create_ticket 创建工单
- 用户查询时,使用 get_ticket_status 查看进度
- 问题解决后,引导用户使用 submit_feedback 评分
注意:
- 始终保持礼貌和耐心
- 记住用户的姓名和问题类别
- 每个用户的对话是独立的"""
)
# ==================== 模拟多用户对话 ====================
def simulate_customer_conversation(user_id: str, user_name: str):
"""模拟单个用户的完整对话流程"""
thread_id = f"customer_{user_id}"
print(f"\n{'='*60}")
print(f"用户:{user_name} (ID: {user_id})")
print(f"{'='*60}\n")
# 初始状态
initial_state = {
"user_id": user_id,
"user_name": user_name,
"issue_category": None,
"ticket_id": None,
"satisfaction_score": None
}
# 第 1 轮:报告问题
print("👤 用户:我的账号登录不了,一直显示密码错误")
response1 = customer_service_agent.invoke(
{**initial_state, "messages": [{"role": "user", "content": "我的账号登录不了,一直显示密码错误"}]},
{"configurable": {"thread_id": thread_id}}
)
print(f"🤖 客服:{response1['messages'][-1].content}\n")
# 第 2 轮:查询进度
print("👤 用户:我的工单处理得怎么样了?")
response2 = customer_service_agent.invoke(
{"messages": [{"role": "user", "content": "我的工单处理得怎么样了?"}]},
{"configurable": {"thread_id": thread_id}}
)
print(f"🤖 客服:{response2['messages'][-1].content}\n")
# 第 3 轮:提交反馈
print("👤 用户:问题已解决,我给 5 分好评!")
response3 = customer_service_agent.invoke(
{"messages": [{"role": "user", "content": "问题已解决,我给 5 分好评,服务态度很棒!"}]},
{"configurable": {"thread_id": thread_id}}
)
print(f"🤖 客服:{response3['messages'][-1].content}\n")
# ==================== 测试 ====================
if __name__ == "__main__":
# 模拟多个用户的独立对话(完全隔离)
simulate_customer_conversation("user_001", "张三")
simulate_customer_conversation("user_002", "李四")
simulate_customer_conversation("user_003", "王五")
print("\n" + "="*60)
print("多用户对话完成!每个用户的状态完全隔离。")
print("="*60)
❓ 常见问题
Q1: Checkpointer 会自动处理消息限制吗?
不会。Checkpointer 只负责持久化状态,不会自动修剪消息。
你需要使用中间件(如 SummarizationMiddleware 或自定义 @before_model 钩子)
来管理消息历史。
Q2: 不同 Checkpointer 之间可以迁移吗?
可以,但需要手动处理。所有 Checkpointer 都使用相同的状态格式, 你可以读取一个 Checkpointer 的状态并写入另一个。例如:
# 从 InMemorySaver 迁移到 PostgresSaver
old_checkpointer = InMemorySaver()
new_checkpointer = PostgresSaver.from_conn_string(DB_URI)
# 读取旧状态
state = old_checkpointer.get({"configurable": {"thread_id": "thread_001"}})
# 写入新 Checkpointer
new_checkpointer.put(
{"configurable": {"thread_id": "thread_001"}},
state.checkpoint,
state.metadata
)
Q3: Thread ID 可以修改吗?
Thread ID 在 Checkpointer 中是不可变的。如果需要"合并"对话, 你需要创建新的 Thread ID,并手动将旧对话的消息复制过去。
Q4: Checkpointer 的性能如何?
性能取决于后端存储:
- InMemorySaver:极快,但不持久化
- SqliteSaver:中等,适合小型应用
- PostgresSaver:取决于数据库配置,生产级应用需要索引优化
建议为 thread_id 和 checkpoint_id 创建索引以提升查询性能。
Q5: 如何清理旧的对话历史?
# 使用 PostgresSaver 定期清理旧数据
import psycopg2
from datetime import datetime, timedelta
# 删除 30 天前的检查点
conn = psycopg2.connect(DB_URI)
cursor = conn.cursor()
cutoff_date = datetime.now() - timedelta(days=30)
cursor.execute("""
DELETE FROM checkpoints
WHERE created_at < %s
""", (cutoff_date,))
conn.commit()
cursor.close()
conn.close()
✨ 最佳实践
1. 选择合适的 Checkpointer
| 环境 | 推荐方案 | 理由 |
|---|---|---|
| 本地开发 | InMemorySaver | 快速迭代,无需配置 |
| 测试环境 | SqliteSaver | 持久化,便于调试 |
| 生产环境 | PostgresSaver | 高可用、可扩展 |
2. Thread ID 命名规范
- 用户会话:
user_{user_id}_{session_id} - 业务实体:
{entity_type}_{entity_id} - 多租户:
{tenant_id}_{user_id} - 临时会话:
temp_{uuid}
3. 消息管理策略
- 对话 < 10 轮:无需特殊处理
- 对话 10-50 轮:使用消息修剪
- 对话 > 50 轮:使用消息摘要
- 关键业务对话:同时保留原始历史到外部存储
4. 状态设计原则
- 最小化:只存储必要的信息
- 引用优先:存储 ID 而非完整对象
- 可序列化:确保所有字段可以 JSON 序列化
- 版本控制:考虑状态 Schema 的演进
5. 监控和日志
# 监控 Checkpointer 使用情况
from langchain.agents.middleware import after_agent
import logging
logger = logging.getLogger(__name__)
@after_agent
def log_checkpoint_stats(state, runtime):
"""记录检查点统计信息"""
message_count = len(state["messages"])
thread_id = runtime.config.get("configurable", {}).get("thread_id")
logger.info(f"Thread {thread_id}: {message_count} messages saved")
# 警告:消息数量过多
if message_count > 100:
logger.warning(f"Thread {thread_id} has {message_count} messages - consider cleanup")
return None