存储后端 API

BaseStore 与键值存储

概述

存储后端提供键值存储功能,用于存储历史记录、文档缓存等数据。

graph TD
                    A[BaseStore] --> B[InMemoryStore]
                    A --> C[RedisStore]
                    A --> D[LocalFileStore]
                    A --> E[UpstashStore]

    A --> F[特定用途]
    F --> G[BaseChatMessageHistory]
    F --> H[InMemoryChatMessageHistory]

    style A fill:#e1f5fe
    style B fill:#c8e6c9

基类

BaseStore

键值存储抽象基类。

from langchain_core.stores import BaseStore

class BaseStore(Generic[K, V], ABC):
    """键值存储基类

    Type Parameters:
        K: 键类型
        V: 值类型
    """

    @abstractmethod
    async def aget(
        self,
        key: K,
    ) -> Optional[V]:
        """
        异步获取值

        Args:
            key: 键

        Returns:
            值(如果不存在返回 None)
        """

    @abstractmethod
    async def aset(
        self,
        key: K,
        value: V,
    ) -> None:
        """
        异步设置值

        Args:
            key: 键
            value: 值
        """

    @abstractmethod
    async def adelete(
        self,
        key: K,
    ) -> None:
        """
        异步删除值

        Args:
            key: 键
        """

    @abstractmethod
    async def alist(
        self,
        prefix: Optional[str] = None,
    ) -> AsyncIterator[K]:
        """
        异步列出键

        Args:
            prefix: 键前缀过滤

        Yields:
            键
        """

    # 同步方法(默认调用异步方法)
    def get(
        self,
        key: K,
    ) -> Optional[V]:
        """同步获取值"""

    def set(
        self,
        key: K,
        value: V,
    ) -> None:
        """同步设置值"""

    def delete(
        self,
        key: K,
    ) -> None:
        """同步删除值"""

    def list(
        self,
        prefix: Optional[str] = None,
    ) -> Iterator[K]:
        """同步列出键"""

    async def amget(
        self,
        keys: Sequence[K],
    ) -> List[Optional[V]]:
        """
        批量获取

        Args:
            keys: 键列表

        Returns:
            值列表
        """

    async def amset(
        self,
        key_value_pairs: Sequence[Tuple[K, V]],
    ) -> None:
        """
        批量设置

        Args:
            key_value_pairs: (键, 值) 列表
        """

    async def amdelete(
        self,
        keys: Sequence[K],
    ) -> None:
        """
        批量删除

        Args:
            keys: 键列表
        """

BaseChatMessageHistory

聊天历史存储基类。

from langchain_core.chat_history import BaseChatMessageHistory

class BaseChatMessageHistory(ABC):
    """聊天历史基类"""

    @property
    @abstractmethod
    def messages(self) -> List[BaseMessage]:
        """获取消息列表"""

    @abstractmethod
    async def aadd_messages(
        self,
        messages: Sequence[BaseMessage],
    ) -> None:
        """
        异步添加消息

        Args:
            messages: 消息列表
        """

    @abstractmethod
    async def aclear(self) -> None:
        """异步清空历史"""

    # 同步方法
    def add_messages(
        self,
        messages: Sequence[BaseMessage],
    ) -> None:
        """添加消息"""

    def clear(self) -> None:
        """清空历史"""

实现类

InMemoryStore

内存键值存储。

from langchain_core.stores import InMemoryStore

class InMemoryStore(BaseStore[str, Any]):
    """内存键值存储"""

    def __init__(self) -> None:
        """初始化(使用字典存储)"""

使用示例

python
from langchain_core.stores import InMemoryStore

store = InMemoryStore()

# 设置值
await store.aset("key1", "value1")
store.set("key2", "value2")

# 获取值
value = await store.aget("key1")
value = store.get("key2")

# 批量操作
await store.amset([
    ("key3", "value3"),
    ("key4", "value4")
])

values = await store.amget(["key1", "key2"])

# 列出键
async for key in store.alist():
    print(key)

# 删除
await store.adelete("key1")
store.delete("key2")

InMemoryChatMessageHistory

内存聊天历史。

from langchain_core.chat_history import InMemoryChatMessageHistory

class InMemoryChatMessageHistory(BaseChatMessageHistory):
    """内存聊天历史"""

    def __init__(self) -> None:
        """初始化"""

使用示例

python
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage

history = InMemoryChatMessageHistory()

# 添加消息
history.add_messages([
    HumanMessage(content="你好"),
    AIMessage(content="你好!有什么可以帮助你的?")
])

# 查看消息
for msg in history.messages:
    print(f"{msg.type}: {msg.content}")

# 清空历史
history.clear()

RedisStore

Redis 键值存储。

from langchain_community.stores import RedisStore

class RedisStore(BaseStore[str, str]):
    """Redis 键值存储"""

    def __init__(
        self,
        redis_client: Optional[Redis] = None,
        client_kwargs: Optional[dict] = None,
        prefix: str = "store:",
        ttl: Optional[int] = None,
    ):
        """
        初始化 Redis 存储

        Args:
            redis_client: 自定义 Redis 客户端
            client_kwargs: 客户端参数
                - host: 主机
                - port: 端口
                - db: 数据库
                - password: 密码
            prefix: 键前缀
            ttl: 过期时间(秒)
        """

使用示例

python
from langchain_community.stores import RedisStore

# 方式1: 使用客户端参数
store = RedisStore(
    client_kwargs={
        "host": "localhost",
        "port": 6379,
        "db": 0
    },
    prefix="myapp:",
    ttl=3600  # 1小时过期
)

# 方式2: 使用自定义客户端
import redis

client = redis.Redis(host="localhost", port=6379, db=0)
store = RedisStore(redis_client=client)

# 使用
await store.aset("user:123", "数据")
value = await store.aget("user:123")

LocalFileStore

本地文件存储。

from langchain_stores import LocalFileStore

class LocalFileStore(BaseStore[str, str]):
    """本地文件存储"""

    def __init__(
        self,
        root_path: str,
    ):
        """
        初始化

        Args:
            root_path: 根目录路径
        """

使用示例

python
from langchain_stores import LocalFileStore

store = LocalFileStore("./data/store")

# 设置值(自动序列化)
await store.aset("key1", "value1")

# 获取值(自动反序列化)
value = await store.aget("key1")

# 列出键
async for key in store.alist():
    print(key)

UpstashStore

Upstash Redis 兼容存储。

from langchain_community.stores import UpstashStore

class UpstashStore(BaseStore[str, Any]):
    """Upstash 存储"""

    def __init__(
        self,
        client: Any = None,
        UpstashClient: Any = None,
        upstash_client: Any = None,
    ):
        """
        初始化

        Args:
            client: Upstash 客户端
        """

使用示例

python
from langchain_community.stores import UpstashStore
from upstash import Redis

# 创建客户端
client = Redis(url="your-upstash-url", token="your-token")
store = UpstashStore(client=client)

# 使用
await store.aset("key", "value")
value = await store.aget("key")

ChatMessageHistory 实现

RedisChatMessageHistory

from langchain_community.chat_message_histories import RedisChatMessageHistory

history = RedisChatMessageHistory(
    session_id="user_123",
    url="redis://localhost:6379/0",
    key_prefix="chat_history:"
)

history.add_message(HumanMessage(content="你好"))

PostgresChatMessageHistory

from langchain_community.chat_message_histories import PostgresChatMessageHistory

history = PostgresChatMessageHistory(
    connection_string="postgresql://user:pass@localhost/db",
    session_id="user_123",
    table_name="chat_history"
)

MongoDBChatMessageHistory

from langchain_community.chat_message_histories import MongoDBChatMessageHistory

history = MongoDBChatMessageHistory(
    connection_string="mongodb://localhost:27017",
    session_id="user_123",
    database_name="chat_db",
    collection_name="histories"
)

DynamoDBChatMessageHistory

from langchain_community.chat_message_histories import DynamoDBChatMessageHistory

history = DynamoDBChatMessageHistory(
    table_name="ChatHistory",
    session_id="user_123",
    endpoint_url="http://localhost:8000"
)

使用示例

python
# ========== 示例1: InMemoryStore 基础使用 ==========
from langchain_core.stores import InMemoryStore

store = InMemoryStore()

# 设置和获取
await store.aset("user:123", {"name": "Alice", "age": 30})
user_data = await store.aget("user:123")

# ========== 示例2: 批量操作 ==========
# 批量设置
await store.amset([
    ("key1", "value1"),
    ("key2", "value2"),
    ("key3", "value3")
])

# 批量获取
values = await store.amget(["key1", "key2", "key3"])

# 批量删除
await store.amdelete(["key1", "key2"])

# ========== 示例3: 带前缀列出 ==========
async for key in store.alist(prefix="user:"):
    print(key)

# ========== 示例4: 聊天历史管理 ==========
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage

history = InMemoryChatMessageHistory()

# 添加对话
history.add_messages([
    HumanMessage(content="什么是 Python?"),
    AIMessage(content="Python 是一种编程语言"),
    HumanMessage(content="它有什么特点?"),
    AIMessage(content="Python 简洁易学、功能强大")
])

# 查看历史
for msg in history.messages:
    print(f"{msg.type}: {msg.content}")

# 清空
history.clear()

# ========== 示例5: 持久化聊天历史到 Redis ==========
from langchain_community.chat_message_histories import RedisChatMessageHistory

# 为每个会话创建历史
def get_history(session_id: str):
    return RedisChatMessageHistory(
        session_id=session_id,
        url="redis://localhost:6379/0"
    )

history = get_history("user_123")
history.add_message(HumanMessage(content="你好"))

# ========== 示例6: 自定义 Store ==========
from langchain_core.stores import BaseStore

class CustomStore(BaseStore[str, str]):
    def __init__(self):
        self._data = {}

    async def aget(self, key: str):
        return self._data.get(key)

    async def aset(self, key: str, value: str):
        self._data[key] = value

    async def adelete(self, key: str):
        self._data.pop(key, None)

    async def alist(self, prefix=None):
        if prefix:
            return (k for k in self._data if k.startswith(prefix))
        return iter(self._data)

# ========== 示例7: 与 RunnableWithMessageHistory 集成 ==========
from langchain_core.runnables.history import RunnableWithMessageHistory

chain = prompt | llm

wrapped_chain = RunnableWithMessageHistory(
    runnable=chain,
    get_session_history=lambda session_id: get_history(session_id),
    input_messages_key="input",
    history_messages_key="history"
)

result = await wrapped_chain.ainvoke(
    {"input": "我刚才问了什么?"},
    config={"configurable": {"session_id": "user_123"}}
)

相关 API