检查点 API

CheckpointSaver 与状态持久化

概述

检查点(Checkpointer)用于保存和恢复 LangGraph 应用的状态,支持时间旅行和断点续传。

graph TD
    A[BaseCheckpointSaver] --> B[MemorySaver]
    A --> C[SqliteSaver]
    A --> D[PostgresCheckpointSaver]
    A --> E[RedisCheckpointSaver]
    A --> F[AzureCheckpointSaver]

    A --> G[使用场景]
    G --> H[LangGraph 状态]
    G --> I[对话历史]
    G --> J[长时间运行任务]

    style A fill:#e1f5fe
    style B fill:#c8e6c9

基类

BaseCheckpointSaver

检查点保存器抽象基类。

from langchain_core.checkpoint import BaseCheckpointSaver
from langchain_core.runnables import RunnableConfig

class BaseCheckpointSaver(ABC):
    """检查点保存器基类"""

    @abstractmethod
    def get(
        self,
        config: RunnableConfig,
    ) -> Optional[Checkpoint]:
        """
        获取检查点

        Args:
            config: 运行配置,需包含 configurable.thread_id

        Returns:
            检查点数据(如果不存在返回 None)
        """

    @abstractmethod
    async def aget(
        self,
        config: RunnableConfig,
    ) -> Optional[Checkpoint]:
        """异步获取检查点"""

    @abstractmethod
    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
    ) -> RunnableConfig:
        """
        保存检查点

        Args:
            config: 运行配置
            checkpoint: 检查点数据

        Returns:
            更新后的配置(包含新检查点 ID)
        """

    @abstractmethod
    async def aput(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
    ) -> RunnableConfig:
        """异步保存检查点"""

    def list(
        self,
        config: Optional[RunnableConfig],
        *,
        limit: int = 10,
        before: Optional[RunnableConfig] = None,
    ) -> Iterator[CheckpointTuple]:
        """
        列出检查点

        Args:
            config: 基础配置
            limit: 返回数量限制
            before: 列出此配置之前的检查点

        Yields:
            检查点元组 (config, checkpoint)
        """

    async def alist(
        self,
        config: Optional[RunnableConfig],
        *,
        limit: int = 10,
        before: Optional[RunnableConfig] = None,
    ) -> AsyncIterator[CheckpointTuple]:
        """异步列出检查点"""

    # 可选方法
    def get_tuple(
        self,
        config: RunnableConfig,
    ) -> Optional[CheckpointTuple]:
        """
        获取检查点元组(包含父检查点)

        Returns:
            (config, checkpoint, metadata, parent_config)
        """

Checkpoint 数据结构

from typing import TypedDict

class Checkpoint(TypedDict, total=False):
    """检查点数据"""
    # 用户定义的状态字段
    # 例如: {"messages": [...], "step_count": 5}

class CheckpointMetadata(TypedDict, total=False):
    """检查点元数据"""
    source: str
    """检查点来源"""
    step: int
    """步骤编号"""
    writes: List[Any]
    """写入记录"""
    parents: dict
    """父检查点映射"""

实现类

MemorySaver

内存检查点保存器(开发/测试用)。

from langgraph.checkpoint.memory import MemorySaver

class MemorySaver(BaseCheckpointSaver):
    """内存检查点保存器"""

    storage: Dict[str, Dict[str, Checkpoint]]
    """存储: {thread_id: {checkpoint_id: checkpoint}}"""

    def __init__(self):
        """初始化(使用字典存储)"""

使用示例

python
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

# 创建检查点保存器
memory = MemorySaver()

# 编译图时传入
app = workflow.compile(checkpointer=memory)

# 使用(指定 thread_id)
config = {"configurable": {"thread_id": "conversation_123"}}
result = app.invoke({"messages": ["你好"]}, config=config)

# 后续执行会从检查点恢复
result = app.invoke({"messages": ["我刚才问了什么?"]}, config=config)

SqliteSaver

SQLite 检查点保存器(本地持久化)。

from langchain.checkpoint.sqlite import SqliteSaver

class SqliteSaver(BaseCheckpointSaver):
    """SQLite 检查点保存器"""

    conn: sqlite3.Connection
    """SQLite 连接"""

    def __init__(
        self,
        conn: Union[str, sqlite3.Connection],
    ):
        """
        初始化

        Args:
            conn: 数据库文件路径或连接
        """

使用示例

python
from langchain.checkpoint.sqlite import SqliteSaver

# 方式1: 使用文件路径
memory = SqliteSaver.from_conn_string("checkpoints.db")

# 方式2: 使用连接
import sqlite3
conn = sqlite3.connect("checkpoints.db")
memory = SqliteSaver(conn)

# 使用
app = workflow.compile(checkpointer=memory)

PostgresCheckpointSaver

PostgreSQL 检查点保存器。

from langchain.checkpoint.postgres import PostgresCheckpointSaver

class PostgresCheckpointSaver(BaseCheckpointSaver):
    """PostgreSQL 检查点保存器"""

    def __init__(
        self,
        conn: Union[str, AsyncConnection],
    ):
        """
        初始化

        Args:
            conn: 连接字符串或异步连接
                格式: "postgresql://user:pass@host:port/db"
        """

使用示例

python
from langchain.checkpoint.postgres import PostgresCheckpointSaver

memory = PostgresCheckpointSaver.from_conn_string(
    "postgresql://user:pass@localhost:5432/langchain"
)

app = workflow.compile(checkpointer=memory)

RedisCheckpointSaver

Redis 检查点保存器。

from langchain.checkpoint.redis import RedisCheckpointSaver

class RedisCheckpointSaver(BaseCheckpointSaver):
    """Redis 检查点保存器"""

    def __init__(
        self,
        conn: Optional[Redis] = None,
        client_kwargs: Optional[dict] = None,
    ):
        """
        初始化

        Args:
            conn: Redis 客户端
            client_kwargs: 客户端参数
        """

使用示例

python
from langchain.checkpoint.redis import RedisCheckpointSaver

memory = RedisCheckpointSaver(
    client_kwargs={
        "host": "localhost",
        "port": 6379,
        "db": 0
    }
)

app = workflow.compile(checkpointer=memory)

AzureCheckpointSaver

Azure Blob Storage 检查点保存器。

from langchain.checkpoint.azure import AzureCheckpointSaver

class AzureCheckpointSaver(BaseCheckpointSaver):
    """Azure 检查点保存器"""

    def __init__(
        self,
        container_name: str,
        credential: Any,
        blob_name_prefix: str = "",
    ):
        """
        初始化

        Args:
            container_name: 容器名称
            credential: Azure 凭据
            blob_name_prefix: Blob 前缀
        """

使用示例

python
# ========== 示例1: 使用 MemorySaver ==========
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    messages: Annotated[list, operator.add]
    step_count: int

def node_a(state: GraphState):
    return {"messages": ["A 执行"], "step_count": state.get("step_count", 0) + 1}

# 构建图
workflow = StateGraph(GraphState)
workflow.add_node("node_a", node_a)
workflow.set_entry_point("node_a")
workflow.add_edge("node_a", END)

# 使用检查点
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# 执行(指定 thread_id)
config = {"configurable": {"thread_id": "thread_1"}}
result = app.invoke({"messages": [], "step_count": 0}, config=config)
print(f"Step count: {result['step_count']}")  # 1

# 第二次执行(状态会保留)
result = app.invoke({"messages": ["继续"]}, config=config)
print(f"Step count: {result['step_count']}")  # 2

# ========== 示例2: 查看历史状态 ==========
state_history = app.get_state(config)
print(f"当前状态: {state_history.values}")

# 列出所有检查点
for checkpoint in app.list(config):
    print(f"Checkpoint: {checkpoint}")

# ========== 示例3: 时间旅行 ==========
# 回到之前的检查点
checkpoint_list = list(app.list(config))
if len(checkpoint_list) > 1:
    # 获取前一个检查点
    prev_checkpoint = checkpoint_list[-2]
    # 从该点重新执行
    app.replay(prev_checkpoint.config)

# ========== 示例4: 使用 SqliteSaver ==========
from langchain.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=memory)

# ========== 示例5: 多线程隔离 ==========
# 不同 thread_id 的状态是隔离的
config1 = {"configurable": {"thread_id": "user_123"}}
config2 = {"configurable": {"thread_id": "user_456"}}

result1 = app.invoke({"messages": ["用户1消息"]}, config=config1)
result2 = app.invoke({"messages": ["用户2消息"]}, config=config2)

# 两个线程的状态独立

# ========== 示例6: 与 RunnableWithMessageHistory 结合 ==========
from langchain_core.runnables.history import RunnableWithMessageHistory

# 使用 MemorySaver 作为历史存储
from langgraph.checkpoint.memory import MemorySaver

def get_history(session_id: str):
    # 返回该 session 的检查点
    return MemorySaver()

with_history = RunnableWithMessageHistory(
    chain,
    get_history,
    input_messages_key="input",
    history_messages_key="history"
)

相关 API