存储后端 API
BaseStore 与键值存储
概述
存储后端提供键值存储功能,用于存储历史记录、文档缓存等数据。
graph TD
A[BaseStore] --> B[InMemoryStore]
A --> C[RedisStore]
A --> D[LocalFileStore]
A --> E[UpstashStore]
A --> F[特定用途]
F --> G[BaseChatMessageHistory]
F --> H[InMemoryChatMessageHistory]
style A fill:#e1f5fe
style B fill:#c8e6c9
基类
BaseStore
键值存储抽象基类。
from langchain_core.stores import BaseStore
class BaseStore(Generic[K, V], ABC):
"""键值存储基类
Type Parameters:
K: 键类型
V: 值类型
"""
@abstractmethod
async def aget(
self,
key: K,
) -> Optional[V]:
"""
异步获取值
Args:
key: 键
Returns:
值(如果不存在返回 None)
"""
@abstractmethod
async def aset(
self,
key: K,
value: V,
) -> None:
"""
异步设置值
Args:
key: 键
value: 值
"""
@abstractmethod
async def adelete(
self,
key: K,
) -> None:
"""
异步删除值
Args:
key: 键
"""
@abstractmethod
async def alist(
self,
prefix: Optional[str] = None,
) -> AsyncIterator[K]:
"""
异步列出键
Args:
prefix: 键前缀过滤
Yields:
键
"""
# 同步方法(默认调用异步方法)
def get(
self,
key: K,
) -> Optional[V]:
"""同步获取值"""
def set(
self,
key: K,
value: V,
) -> None:
"""同步设置值"""
def delete(
self,
key: K,
) -> None:
"""同步删除值"""
def list(
self,
prefix: Optional[str] = None,
) -> Iterator[K]:
"""同步列出键"""
async def amget(
self,
keys: Sequence[K],
) -> List[Optional[V]]:
"""
批量获取
Args:
keys: 键列表
Returns:
值列表
"""
async def amset(
self,
key_value_pairs: Sequence[Tuple[K, V]],
) -> None:
"""
批量设置
Args:
key_value_pairs: (键, 值) 列表
"""
async def amdelete(
self,
keys: Sequence[K],
) -> None:
"""
批量删除
Args:
keys: 键列表
"""
BaseChatMessageHistory
聊天历史存储基类。
from langchain_core.chat_history import BaseChatMessageHistory
class BaseChatMessageHistory(ABC):
"""聊天历史基类"""
@property
@abstractmethod
def messages(self) -> List[BaseMessage]:
"""获取消息列表"""
@abstractmethod
async def aadd_messages(
self,
messages: Sequence[BaseMessage],
) -> None:
"""
异步添加消息
Args:
messages: 消息列表
"""
@abstractmethod
async def aclear(self) -> None:
"""异步清空历史"""
# 同步方法
def add_messages(
self,
messages: Sequence[BaseMessage],
) -> None:
"""添加消息"""
def clear(self) -> None:
"""清空历史"""
实现类
InMemoryStore
内存键值存储。
from langchain_core.stores import InMemoryStore
class InMemoryStore(BaseStore[str, Any]):
"""内存键值存储"""
def __init__(self) -> None:
"""初始化(使用字典存储)"""
使用示例
python
from langchain_core.stores import InMemoryStore
store = InMemoryStore()
# 设置值
await store.aset("key1", "value1")
store.set("key2", "value2")
# 获取值
value = await store.aget("key1")
value = store.get("key2")
# 批量操作
await store.amset([
("key3", "value3"),
("key4", "value4")
])
values = await store.amget(["key1", "key2"])
# 列出键
async for key in store.alist():
print(key)
# 删除
await store.adelete("key1")
store.delete("key2")
InMemoryChatMessageHistory
内存聊天历史。
from langchain_core.chat_history import InMemoryChatMessageHistory
class InMemoryChatMessageHistory(BaseChatMessageHistory):
"""内存聊天历史"""
def __init__(self) -> None:
"""初始化"""
使用示例
python
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage
history = InMemoryChatMessageHistory()
# 添加消息
history.add_messages([
HumanMessage(content="你好"),
AIMessage(content="你好!有什么可以帮助你的?")
])
# 查看消息
for msg in history.messages:
print(f"{msg.type}: {msg.content}")
# 清空历史
history.clear()
RedisStore
Redis 键值存储。
from langchain_community.stores import RedisStore
class RedisStore(BaseStore[str, str]):
"""Redis 键值存储"""
def __init__(
self,
redis_client: Optional[Redis] = None,
client_kwargs: Optional[dict] = None,
prefix: str = "store:",
ttl: Optional[int] = None,
):
"""
初始化 Redis 存储
Args:
redis_client: 自定义 Redis 客户端
client_kwargs: 客户端参数
- host: 主机
- port: 端口
- db: 数据库
- password: 密码
prefix: 键前缀
ttl: 过期时间(秒)
"""
使用示例
python
from langchain_community.stores import RedisStore
# 方式1: 使用客户端参数
store = RedisStore(
client_kwargs={
"host": "localhost",
"port": 6379,
"db": 0
},
prefix="myapp:",
ttl=3600 # 1小时过期
)
# 方式2: 使用自定义客户端
import redis
client = redis.Redis(host="localhost", port=6379, db=0)
store = RedisStore(redis_client=client)
# 使用
await store.aset("user:123", "数据")
value = await store.aget("user:123")
LocalFileStore
本地文件存储。
from langchain_stores import LocalFileStore
class LocalFileStore(BaseStore[str, str]):
"""本地文件存储"""
def __init__(
self,
root_path: str,
):
"""
初始化
Args:
root_path: 根目录路径
"""
使用示例
python
from langchain_stores import LocalFileStore
store = LocalFileStore("./data/store")
# 设置值(自动序列化)
await store.aset("key1", "value1")
# 获取值(自动反序列化)
value = await store.aget("key1")
# 列出键
async for key in store.alist():
print(key)
UpstashStore
Upstash Redis 兼容存储。
from langchain_community.stores import UpstashStore
class UpstashStore(BaseStore[str, Any]):
"""Upstash 存储"""
def __init__(
self,
client: Any = None,
UpstashClient: Any = None,
upstash_client: Any = None,
):
"""
初始化
Args:
client: Upstash 客户端
"""
使用示例
python
from langchain_community.stores import UpstashStore
from upstash import Redis
# 创建客户端
client = Redis(url="your-upstash-url", token="your-token")
store = UpstashStore(client=client)
# 使用
await store.aset("key", "value")
value = await store.aget("key")
ChatMessageHistory 实现
RedisChatMessageHistory
from langchain_community.chat_message_histories import RedisChatMessageHistory
history = RedisChatMessageHistory(
session_id="user_123",
url="redis://localhost:6379/0",
key_prefix="chat_history:"
)
history.add_message(HumanMessage(content="你好"))
PostgresChatMessageHistory
from langchain_community.chat_message_histories import PostgresChatMessageHistory
history = PostgresChatMessageHistory(
connection_string="postgresql://user:pass@localhost/db",
session_id="user_123",
table_name="chat_history"
)
MongoDBChatMessageHistory
from langchain_community.chat_message_histories import MongoDBChatMessageHistory
history = MongoDBChatMessageHistory(
connection_string="mongodb://localhost:27017",
session_id="user_123",
database_name="chat_db",
collection_name="histories"
)
DynamoDBChatMessageHistory
from langchain_community.chat_message_histories import DynamoDBChatMessageHistory
history = DynamoDBChatMessageHistory(
table_name="ChatHistory",
session_id="user_123",
endpoint_url="http://localhost:8000"
)
使用示例
python
# ========== 示例1: InMemoryStore 基础使用 ==========
from langchain_core.stores import InMemoryStore
store = InMemoryStore()
# 设置和获取
await store.aset("user:123", {"name": "Alice", "age": 30})
user_data = await store.aget("user:123")
# ========== 示例2: 批量操作 ==========
# 批量设置
await store.amset([
("key1", "value1"),
("key2", "value2"),
("key3", "value3")
])
# 批量获取
values = await store.amget(["key1", "key2", "key3"])
# 批量删除
await store.amdelete(["key1", "key2"])
# ========== 示例3: 带前缀列出 ==========
async for key in store.alist(prefix="user:"):
print(key)
# ========== 示例4: 聊天历史管理 ==========
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage
history = InMemoryChatMessageHistory()
# 添加对话
history.add_messages([
HumanMessage(content="什么是 Python?"),
AIMessage(content="Python 是一种编程语言"),
HumanMessage(content="它有什么特点?"),
AIMessage(content="Python 简洁易学、功能强大")
])
# 查看历史
for msg in history.messages:
print(f"{msg.type}: {msg.content}")
# 清空
history.clear()
# ========== 示例5: 持久化聊天历史到 Redis ==========
from langchain_community.chat_message_histories import RedisChatMessageHistory
# 为每个会话创建历史
def get_history(session_id: str):
return RedisChatMessageHistory(
session_id=session_id,
url="redis://localhost:6379/0"
)
history = get_history("user_123")
history.add_message(HumanMessage(content="你好"))
# ========== 示例6: 自定义 Store ==========
from langchain_core.stores import BaseStore
class CustomStore(BaseStore[str, str]):
def __init__(self):
self._data = {}
async def aget(self, key: str):
return self._data.get(key)
async def aset(self, key: str, value: str):
self._data[key] = value
async def adelete(self, key: str):
self._data.pop(key, None)
async def alist(self, prefix=None):
if prefix:
return (k for k in self._data if k.startswith(prefix))
return iter(self._data)
# ========== 示例7: 与 RunnableWithMessageHistory 集成 ==========
from langchain_core.runnables.history import RunnableWithMessageHistory
chain = prompt | llm
wrapped_chain = RunnableWithMessageHistory(
runnable=chain,
get_session_history=lambda session_id: get_history(session_id),
input_messages_key="input",
history_messages_key="history"
)
result = await wrapped_chain.ainvoke(
{"input": "我刚才问了什么?"},
config={"configurable": {"session_id": "user_123"}}
)