检查点 API
CheckpointSaver 与状态持久化
概述
检查点(Checkpointer)用于保存和恢复 LangGraph 应用的状态,支持时间旅行和断点续传。
graph TD
A[BaseCheckpointSaver] --> B[MemorySaver]
A --> C[SqliteSaver]
A --> D[PostgresCheckpointSaver]
A --> E[RedisCheckpointSaver]
A --> F[AzureCheckpointSaver]
A --> G[使用场景]
G --> H[LangGraph 状态]
G --> I[对话历史]
G --> J[长时间运行任务]
style A fill:#e1f5fe
style B fill:#c8e6c9
基类
BaseCheckpointSaver
检查点保存器抽象基类。
from langchain_core.checkpoint import BaseCheckpointSaver
from langchain_core.runnables import RunnableConfig
class BaseCheckpointSaver(ABC):
"""检查点保存器基类"""
@abstractmethod
def get(
self,
config: RunnableConfig,
) -> Optional[Checkpoint]:
"""
获取检查点
Args:
config: 运行配置,需包含 configurable.thread_id
Returns:
检查点数据(如果不存在返回 None)
"""
@abstractmethod
async def aget(
self,
config: RunnableConfig,
) -> Optional[Checkpoint]:
"""异步获取检查点"""
@abstractmethod
def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
) -> RunnableConfig:
"""
保存检查点
Args:
config: 运行配置
checkpoint: 检查点数据
Returns:
更新后的配置(包含新检查点 ID)
"""
@abstractmethod
async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
) -> RunnableConfig:
"""异步保存检查点"""
def list(
self,
config: Optional[RunnableConfig],
*,
limit: int = 10,
before: Optional[RunnableConfig] = None,
) -> Iterator[CheckpointTuple]:
"""
列出检查点
Args:
config: 基础配置
limit: 返回数量限制
before: 列出此配置之前的检查点
Yields:
检查点元组 (config, checkpoint)
"""
async def alist(
self,
config: Optional[RunnableConfig],
*,
limit: int = 10,
before: Optional[RunnableConfig] = None,
) -> AsyncIterator[CheckpointTuple]:
"""异步列出检查点"""
# 可选方法
def get_tuple(
self,
config: RunnableConfig,
) -> Optional[CheckpointTuple]:
"""
获取检查点元组(包含父检查点)
Returns:
(config, checkpoint, metadata, parent_config)
"""
Checkpoint 数据结构
from typing import TypedDict
class Checkpoint(TypedDict, total=False):
"""检查点数据"""
# 用户定义的状态字段
# 例如: {"messages": [...], "step_count": 5}
class CheckpointMetadata(TypedDict, total=False):
"""检查点元数据"""
source: str
"""检查点来源"""
step: int
"""步骤编号"""
writes: List[Any]
"""写入记录"""
parents: dict
"""父检查点映射"""
实现类
MemorySaver
内存检查点保存器(开发/测试用)。
from langgraph.checkpoint.memory import MemorySaver
class MemorySaver(BaseCheckpointSaver):
"""内存检查点保存器"""
storage: Dict[str, Dict[str, Checkpoint]]
"""存储: {thread_id: {checkpoint_id: checkpoint}}"""
def __init__(self):
"""初始化(使用字典存储)"""
使用示例
python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
# 创建检查点保存器
memory = MemorySaver()
# 编译图时传入
app = workflow.compile(checkpointer=memory)
# 使用(指定 thread_id)
config = {"configurable": {"thread_id": "conversation_123"}}
result = app.invoke({"messages": ["你好"]}, config=config)
# 后续执行会从检查点恢复
result = app.invoke({"messages": ["我刚才问了什么?"]}, config=config)
SqliteSaver
SQLite 检查点保存器(本地持久化)。
from langchain.checkpoint.sqlite import SqliteSaver
class SqliteSaver(BaseCheckpointSaver):
"""SQLite 检查点保存器"""
conn: sqlite3.Connection
"""SQLite 连接"""
def __init__(
self,
conn: Union[str, sqlite3.Connection],
):
"""
初始化
Args:
conn: 数据库文件路径或连接
"""
使用示例
python
from langchain.checkpoint.sqlite import SqliteSaver
# 方式1: 使用文件路径
memory = SqliteSaver.from_conn_string("checkpoints.db")
# 方式2: 使用连接
import sqlite3
conn = sqlite3.connect("checkpoints.db")
memory = SqliteSaver(conn)
# 使用
app = workflow.compile(checkpointer=memory)
PostgresCheckpointSaver
PostgreSQL 检查点保存器。
from langchain.checkpoint.postgres import PostgresCheckpointSaver
class PostgresCheckpointSaver(BaseCheckpointSaver):
"""PostgreSQL 检查点保存器"""
def __init__(
self,
conn: Union[str, AsyncConnection],
):
"""
初始化
Args:
conn: 连接字符串或异步连接
格式: "postgresql://user:pass@host:port/db"
"""
使用示例
python
from langchain.checkpoint.postgres import PostgresCheckpointSaver
memory = PostgresCheckpointSaver.from_conn_string(
"postgresql://user:pass@localhost:5432/langchain"
)
app = workflow.compile(checkpointer=memory)
RedisCheckpointSaver
Redis 检查点保存器。
from langchain.checkpoint.redis import RedisCheckpointSaver
class RedisCheckpointSaver(BaseCheckpointSaver):
"""Redis 检查点保存器"""
def __init__(
self,
conn: Optional[Redis] = None,
client_kwargs: Optional[dict] = None,
):
"""
初始化
Args:
conn: Redis 客户端
client_kwargs: 客户端参数
"""
使用示例
python
from langchain.checkpoint.redis import RedisCheckpointSaver
memory = RedisCheckpointSaver(
client_kwargs={
"host": "localhost",
"port": 6379,
"db": 0
}
)
app = workflow.compile(checkpointer=memory)
AzureCheckpointSaver
Azure Blob Storage 检查点保存器。
from langchain.checkpoint.azure import AzureCheckpointSaver
class AzureCheckpointSaver(BaseCheckpointSaver):
"""Azure 检查点保存器"""
def __init__(
self,
container_name: str,
credential: Any,
blob_name_prefix: str = "",
):
"""
初始化
Args:
container_name: 容器名称
credential: Azure 凭据
blob_name_prefix: Blob 前缀
"""
使用示例
python
# ========== 示例1: 使用 MemorySaver ==========
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
step_count: int
def node_a(state: GraphState):
return {"messages": ["A 执行"], "step_count": state.get("step_count", 0) + 1}
# 构建图
workflow = StateGraph(GraphState)
workflow.add_node("node_a", node_a)
workflow.set_entry_point("node_a")
workflow.add_edge("node_a", END)
# 使用检查点
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# 执行(指定 thread_id)
config = {"configurable": {"thread_id": "thread_1"}}
result = app.invoke({"messages": [], "step_count": 0}, config=config)
print(f"Step count: {result['step_count']}") # 1
# 第二次执行(状态会保留)
result = app.invoke({"messages": ["继续"]}, config=config)
print(f"Step count: {result['step_count']}") # 2
# ========== 示例2: 查看历史状态 ==========
state_history = app.get_state(config)
print(f"当前状态: {state_history.values}")
# 列出所有检查点
for checkpoint in app.list(config):
print(f"Checkpoint: {checkpoint}")
# ========== 示例3: 时间旅行 ==========
# 回到之前的检查点
checkpoint_list = list(app.list(config))
if len(checkpoint_list) > 1:
# 获取前一个检查点
prev_checkpoint = checkpoint_list[-2]
# 从该点重新执行
app.replay(prev_checkpoint.config)
# ========== 示例4: 使用 SqliteSaver ==========
from langchain.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=memory)
# ========== 示例5: 多线程隔离 ==========
# 不同 thread_id 的状态是隔离的
config1 = {"configurable": {"thread_id": "user_123"}}
config2 = {"configurable": {"thread_id": "user_456"}}
result1 = app.invoke({"messages": ["用户1消息"]}, config=config1)
result2 = app.invoke({"messages": ["用户2消息"]}, config=config2)
# 两个线程的状态独立
# ========== 示例6: 与 RunnableWithMessageHistory 结合 ==========
from langchain_core.runnables.history import RunnableWithMessageHistory
# 使用 MemorySaver 作为历史存储
from langgraph.checkpoint.memory import MemorySaver
def get_history(session_id: str):
# 返回该 session 的检查点
return MemorySaver()
with_history = RunnableWithMessageHistory(
chain,
get_history,
input_messages_key="input",
history_messages_key="history"
)