⚙️ 中间件系统详解
掌握 LangChain 1.0 的中间件钩子函数,在 Agent 执行的每个步骤 实现监控、转换和控制逻辑,无需修改核心代码。
📚 什么是中间件(Middleware)?
中间件(Middleware)是 LangChain 1.0 的强大扩展机制, 允许你在 Agent 执行的关键节点插入自定义逻辑。通过中间件,你可以:
- 追踪:记录日志、收集分析数据、调试 Agent 行为
- 转换:修改提示词、调整工具选择、格式化输出
- 控制:添加重试、降级、提前终止逻辑
- 治理:应用速率限制、隐私保护、合规检查
中间件让你无需修改核心 Agent 代码即可实现复杂的控制逻辑。 通过钩子函数在 Agent 执行的不同阶段注入行为, 实现模块化、可复用的扩展。
🏗️ 中间件架构和执行顺序
执行顺序规则
当使用多个中间件时,理解执行顺序至关重要:
| 钩子类型 | 执行顺序 | 说明 |
|---|---|---|
before_agent |
从前到后 | 按中间件列表顺序执行 |
before_model |
从前到后 | 每次模型调用前按顺序 |
wrap_model_call |
嵌套包装 | 第一个中间件最外层 |
after_model |
从后到前(反向) | 与 before_model 相反 |
after_agent |
从后到前(反向) | 与 before_agent 相反 |
wrap_tool_call |
嵌套包装 | 第一个中间件最外层 |
- before_* 钩子:从前到后执行
- after_* 钩子:从后到前执行(反向)
- wrap_* 钩子:嵌套包装,先注册的在最外层
🪝 钩子函数类型
LangChain 提供两种风格的钩子函数:
节点风格钩子(Node-Style Hooks)
在特定执行点顺序运行:
before_agent:Agent 启动前执行一次before_model:每次模型调用前执行after_model:每次模型响应后执行after_agent:Agent 完成后执行一次
包装风格钩子(Wrap-Style Hooks)
拦截并控制执行流程:
wrap_model_call:包装每次模型调用wrap_tool_call:包装每次工具调用
Wrap 风格钩子让你决定是否执行、执行多少次:
- 零次:短路跳过(缓存命中)
- 一次:正常执行
- 多次:重试逻辑
🔨 自定义中间件开发
装饰器风格(简单快速)
"""
装饰器风格中间件示例
功能:记录模型调用日志
"""
from langchain.agents.middleware import before_model, after_model
from langchain.agents import create_agent
@before_model
def log_before(state, runtime):
"""模型调用前记录日志"""
print(f"📤 准备调用模型,当前消息数: {len(state['messages'])}")
return None # 不修改状态
@after_model
def log_after(state, runtime):
"""模型响应后记录日志"""
last_msg = state["messages"][-1]
print(f"📥 模型响应: {last_msg.content[:50]}...")
return None
# 创建 Agent
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[log_before, log_after]
)
# 测试
response = agent.invoke({
"messages": [{"role": "user", "content": "你好"}]
})
类风格(强大灵活)
"""
类风格中间件示例
功能:消息限制和日志记录
"""
from langchain.agents import AgentMiddleware, create_agent
from langchain.agents.middleware import hook_config
from langchain.messages import AIMessage
class MessageLimitMiddleware(AgentMiddleware):
"""限制对话消息数量的中间件"""
def __init__(self, max_messages: int = 50):
super().__init__()
self.max_messages = max_messages
@hook_config(can_jump_to=["end"])
def before_model(self, state, runtime):
"""检查消息数量限制"""
if len(state["messages"]) >= self.max_messages:
return {
"messages": [AIMessage(
f"对话已达到最大消息数限制({self.max_messages}条)。"
)],
"jump_to": "end" # 提前结束
}
return None
def after_model(self, state, runtime):
"""记录模型响应"""
msg_count = len(state["messages"])
print(f"📊 当前消息数: {msg_count}/{self.max_messages}")
return None
# 创建 Agent
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[MessageLimitMiddleware(max_messages=10)]
)
Wrap 风格钩子:重试逻辑
"""
Wrap 风格中间件:模型调用重试
功能:失败时自动重试,带指数退避
"""
from langchain.agents.middleware import wrap_model_call
from langchain.agents import create_agent
import time
@wrap_model_call
def retry_model(request, handler):
"""模型调用重试中间件"""
max_retries = 3
base_delay = 1.0
for attempt in range(max_retries):
try:
# 调用实际的模型
return handler(request)
except Exception as e:
if attempt == max_retries - 1:
# 最后一次尝试仍失败,抛出异常
raise
# 计算退避时间
delay = base_delay * (2 ** attempt)
print(f"⚠️ 模型调用失败(尝试 {attempt + 1}/{max_retries}): {e}")
print(f"🔄 等待 {delay} 秒后重试...")
time.sleep(delay)
# 工具重试示例
@wrap_tool_call
def retry_tool(request, handler):
"""工具调用重试中间件"""
max_retries = 2
for attempt in range(max_retries):
try:
result = handler(request)
print(f"✅ 工具 '{request.tool_call['name']}' 执行成功")
return result
except Exception as e:
if attempt == max_retries - 1:
print(f"❌ 工具执行最终失败: {e}")
raise
print(f"🔄 工具重试 {attempt + 1}/{max_retries}")
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[retry_model, retry_tool]
)
📦 内置中间件精选
LangChain 提供了 10+ 个生产就绪的内置中间件。 以下是最常用的几个:
1. SummarizationMiddleware - 对话摘要
自动总结长对话历史,防止超出 Token 限制。
"""
SummarizationMiddleware 示例
功能:对话超过 Token 限制时自动摘要
"""
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[
SummarizationMiddleware(
model="gpt-4o-mini", # 用于生成摘要的模型
trigger=("tokens", 4000), # 超过 4000 tokens 触发
keep=("messages", 20), # 保留最近 20 条消息
),
],
)
# 多条件触发
agent2 = create_agent(
model="gpt-4o",
tools=[],
middleware=[
SummarizationMiddleware(
model="gpt-4o-mini",
trigger=[
("tokens", 3000), # 3000 tokens 或
("messages", 50), # 50 条消息
],
keep=("messages", 10),
),
],
)
2. PIIMiddleware - 隐私保护
检测和处理个人身份信息(PII)。
"""
PIIMiddleware 示例
功能:检测和屏蔽敏感信息
"""
from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware
import re
# 内置 PII 类型:email, credit_card, ip, mac_address, url
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[
# 屏蔽邮箱
PIIMiddleware(
"email",
strategy="redact", # 替换为 [REDACTED_EMAIL]
apply_to_input=True # 检查用户输入
),
# 掩码信用卡
PIIMiddleware(
"credit_card",
strategy="mask", # 部分掩码: ****-****-****-1234
apply_to_input=True
),
],
)
# 自定义 PII 检测:API Key
agent_api = create_agent(
model="gpt-4o",
tools=[],
middleware=[
PIIMiddleware(
"api_key",
detector=r"sk-[a-zA-Z0-9]{32}", # 正则表达式
strategy="block", # 检测到直接阻止
apply_to_input=True
),
],
)
# 自定义检测函数:身份证号
def detect_id_card(content: str):
"""检测身份证号"""
matches = []
pattern = r"\d{17}[\dX]"
for match in re.finditer(pattern, content):
matches.append({
"text": match.group(0),
"start": match.start(),
"end": match.end(),
})
return matches
agent_id = create_agent(
model="gpt-4o",
tools=[],
middleware=[
PIIMiddleware(
"id_card",
detector=detect_id_card,
strategy="hash", # 替换为哈希值
),
],
)
3. HumanInTheLoopMiddleware - 人工审核
暂停执行等待人工批准、编辑或拒绝。
"""
HumanInTheLoopMiddleware 示例
功能:敏感操作前需要人工批准
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件(敏感操作)"""
return f"邮件已发送到 {to}"
@tool
def read_email(folder: str = "inbox") -> str:
"""读取邮件(安全操作)"""
return f"读取 {folder} 中的邮件"
# 创建 Agent(需要 checkpointer)
agent = create_agent(
model="gpt-4o",
tools=[send_email, read_email],
checkpointer=InMemorySaver(), # 必需
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"send_email": {
"allowed_decisions": ["approve", "edit", "reject"],
},
"read_email": False, # 不需要审核
}
),
],
)
# 调用会在 send_email 前暂停,等待人工决策
4. ModelCallLimitMiddleware - 调用限制
"""
ModelCallLimitMiddleware 示例
功能:防止无限循环和过高成本
"""
from langchain.agents import create_agent
from langchain.agents.middleware import ModelCallLimitMiddleware
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[
ModelCallLimitMiddleware(
thread_limit=20, # 整个线程最多 20 次调用
run_limit=5, # 单次执行最多 5 次调用
exit_behavior="end" # 达到限制时优雅结束
),
],
)
5. ToolCallLimitMiddleware - 工具调用限制
"""
ToolCallLimitMiddleware 示例
功能:限制工具调用次数
"""
from langchain.agents import create_agent
from langchain.agents.middleware import ToolCallLimitMiddleware
# 全局限制
global_limit = ToolCallLimitMiddleware(
thread_limit=20,
run_limit=10
)
# 特定工具限制
search_limit = ToolCallLimitMiddleware(
tool_name="search",
thread_limit=5,
run_limit=3
)
agent = create_agent(
model="gpt-4o",
tools=[search_tool, calculator_tool],
middleware=[global_limit, search_limit]
)
🚀 高级特性
自定义状态 Schema
"""
自定义状态 Schema 示例
功能:在状态中添加自定义字段进行跟踪
"""
from typing_extensions import NotRequired
from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import before_model, after_model, hook_config
# 定义自定义状态
class CustomState(AgentState):
model_call_count: NotRequired[int]
total_tokens: NotRequired[int]
user_id: NotRequired[str]
@before_model(state_schema=CustomState, can_jump_to=["end"])
def check_call_limit(state: CustomState, runtime):
"""检查调用次数限制"""
count = state.get("model_call_count", 0)
if count >= 10:
return {"jump_to": "end"}
return None
@after_model(state_schema=CustomState)
def increment_counter(state: CustomState, runtime):
"""增加计数器"""
current_count = state.get("model_call_count", 0)
return {"model_call_count": current_count + 1}
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[check_call_limit, increment_counter],
state_schema=CustomState
)
动态模型选择
"""
动态模型选择示例
功能:根据对话长度自动切换模型
"""
from langchain.agents.middleware import wrap_model_call
from langchain.chat_models import init_chat_model
# 初始化两个模型
simple_model = init_chat_model("gpt-4o-mini")
complex_model = init_chat_model("gpt-4o")
@wrap_model_call
def dynamic_model_selector(request, handler):
"""根据消息数量选择模型"""
message_count = len(request.messages)
if message_count > 20:
# 长对话使用高级模型
selected_model = complex_model
print(f"🔵 使用高级模型(消息数: {message_count})")
else:
# 短对话使用轻量模型
selected_model = simple_model
print(f"🟢 使用轻量模型(消息数: {message_count})")
# 覆盖请求中的模型
return handler(request.override(model=selected_model))
agent = create_agent(
model="gpt-4o-mini", # 默认模型
tools=[],
middleware=[dynamic_model_selector]
)
Agent 跳转控制
"""
Agent 跳转控制示例
功能:检测到违规内容时提前结束
"""
from langchain.agents.middleware import after_model, hook_config
from langchain.messages import AIMessage
@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked_content(state, runtime):
"""检查响应中是否包含被阻止的内容"""
last_message = state["messages"][-1]
# 检查是否包含违规标记
if "BLOCKED" in last_message.content or "拒绝回答" in last_message.content:
return {
"messages": [AIMessage("检测到违规内容,对话已终止。")],
"jump_to": "end" # 跳转到结束节点
}
return None
# 可用的跳转目标:
# - 'end': 跳到结束
# - 'tools': 跳到工具节点
# - 'model': 跳到模型节点
🎯 完整示例:生产级 Agent
"""
生产级 Agent 完整示例
功能:组合多个中间件实现完整的监控、限制和保护
"""
from langchain.agents import create_agent
from langchain.agents.middleware import (
SummarizationMiddleware,
PIIMiddleware,
ModelCallLimitMiddleware,
ToolCallLimitMiddleware,
before_agent,
after_agent,
before_model,
after_model,
)
from langchain.tools import tool
from datetime import datetime
# ==================== 自定义中间件 ====================
@before_agent
def log_session_start(state, runtime):
"""记录会话开始"""
print(f"\n{'='*60}")
print(f"🚀 会话开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}\n")
return None
@after_agent
def log_session_end(state, runtime):
"""记录会话结束"""
message_count = len(state["messages"])
print(f"\n{'='*60}")
print(f"✅ 会话结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"📊 总消息数: {message_count}")
print(f"{'='*60}\n")
return None
@before_model
def log_model_call(state, runtime):
"""记录每次模型调用"""
user_msgs = [m for m in state["messages"] if m.__class__.__name__ == "HumanMessage"]
print(f"🤖 模型调用 #{len(user_msgs)}")
return None
@after_model
def log_model_response(state, runtime):
"""记录模型响应"""
last_msg = state["messages"][-1]
usage = getattr(last_msg, "usage_metadata", None)
if usage:
print(f"📈 Token 使用: {usage.get('total_tokens', 'N/A')}")
return None
# ==================== 定义工具 ====================
@tool
def search_database(query: str) -> str:
"""搜索数据库"""
return f"搜索结果:{query}"
@tool
def send_notification(message: str) -> str:
"""发送通知"""
return f"通知已发送:{message}"
# ==================== 创建生产级 Agent ====================
production_agent = create_agent(
model="gpt-4o",
tools=[search_database, send_notification],
middleware=[
# 1. 会话生命周期日志
log_session_start,
log_session_end,
# 2. 模型调用日志
log_model_call,
log_model_response,
# 3. 调用限制(防止无限循环和过高成本)
ModelCallLimitMiddleware(
thread_limit=30,
run_limit=10,
exit_behavior="end"
),
# 4. 工具调用限制
ToolCallLimitMiddleware(
thread_limit=20,
run_limit=5
),
# 5. 隐私保护
PIIMiddleware("email", strategy="redact", apply_to_input=True),
PIIMiddleware("credit_card", strategy="mask", apply_to_input=True),
# 6. 对话摘要(长对话管理)
SummarizationMiddleware(
model="gpt-4o-mini",
trigger=("tokens", 5000),
keep=("messages", 15)
),
],
system_prompt="""你是一个专业的客服助手。
职责:
1. 回答用户问题
2. 搜索数据库查找信息
3. 发送通知
注意事项:
- 保护用户隐私
- 简洁准确地回答
- 必要时使用工具"""
)
# ==================== 测试 ====================
if __name__ == "__main__":
result = production_agent.invoke({
"messages": [
{"role": "user", "content": "我的邮箱是 [email protected],帮我查询订单状态"}
]
})
print("\n最终响应:")
print(result["messages"][-1].content)
✨ 最佳实践
1. 中间件职责单一
每个中间件应该只处理一个关注点。
# ❌ 不好:一个中间件做太多事
@before_model
def do_everything(state, runtime):
log_message(state)
check_limits(state)
validate_input(state)
track_metrics(state)
# ...
# ✅ 好:每个中间件职责单一
@before_model
def log_message(state, runtime):
"""仅负责日志记录"""
# ...
@before_model
def check_limits(state, runtime):
"""仅负责限制检查"""
# ...
2. 合理安排中间件顺序
将关键中间件放在前面。
agent = create_agent(
model="gpt-4o",
tools=[],
middleware=[
# 1. 最优先:安全和限制
PIIMiddleware(...),
ModelCallLimitMiddleware(...),
# 2. 其次:重试和降级
ToolRetryMiddleware(...),
ModelFallbackMiddleware(...),
# 3. 最后:日志和监控
logging_middleware,
]
)
3. 优雅处理错误
@before_model
def safe_middleware(state, runtime):
"""安全的中间件:不会让 Agent 崩溃"""
try:
# 执行可能失败的操作
risky_operation(state)
except Exception as e:
# 记录错误但不中断执行
print(f"中间件错误: {e}")
# 可选:返回降级行为
return None
4. 选择合适的钩子类型
- 节点风格:日志记录、验证、状态更新
- Wrap 风格:重试、缓存、降级、条件执行
5. 充分测试中间件
def test_message_limit_middleware():
"""测试消息限制中间件"""
middleware = MessageLimitMiddleware(max_messages=5)
# 创建测试状态
test_state = {"messages": [Mock() for _ in range(6)]}
test_runtime = Mock()
# 测试限制触发
result = middleware.before_model(test_state, test_runtime)
assert result is not None
assert "jump_to" in result
assert result["jump_to"] == "end"
❓ 常见问题
Q1: 中间件的性能开销大吗?
中间件本身的开销很小。性能影响主要来自中间件内部的逻辑。 建议:
- 避免在中间件中执行耗时操作
- 使用异步操作处理 I/O 密集任务
- 在生产环境禁用调试日志
Q2: 中间件可以修改工具列表吗?
可以!使用 wrap_model_call 钩子:
@wrap_model_call
def filter_tools(request, handler):
# 根据条件筛选工具
filtered_tools = [t for t in request.tools if should_include(t)]
return handler(request.override(tools=filtered_tools))
Q3: 如何在中间件之间共享数据?
使用自定义状态 Schema 或 runtime.context:
# 方式 1:自定义状态
class SharedState(AgentState):
shared_data: NotRequired[dict]
@before_model(state_schema=SharedState)
def middleware_1(state, runtime):
return {"shared_data": {"key": "value"}}
@after_model(state_schema=SharedState)
def middleware_2(state, runtime):
data = state.get("shared_data", {})
print(data["key"]) # 访问共享数据
Q4: 中间件可以阻止模型调用吗?
可以,使用 jump_to 跳转或在 wrap 钩子中不调用 handler:
# 方式 1:使用 jump_to
@before_model
@hook_config(can_jump_to=["end"])
def block_model(state, runtime):
if should_block(state):
return {"jump_to": "end"}
# 方式 2:wrap 钩子短路
@wrap_model_call
def cache_check(request, handler):
cached = get_from_cache(request)
if cached:
return cached # 不调用 handler,直接返回缓存
return handler(request)
Q5: 装饰器和类风格如何选择?
- 装饰器:快速原型、单个钩子、简单逻辑
- 类风格:多个钩子、复杂配置、需要复用