📚 什么是中间件(Middleware)?

中间件(Middleware)是 LangChain 1.0 的强大扩展机制, 允许你在 Agent 执行的关键节点插入自定义逻辑。通过中间件,你可以:

  • 追踪:记录日志、收集分析数据、调试 Agent 行为
  • 转换:修改提示词、调整工具选择、格式化输出
  • 控制:添加重试、降级、提前终止逻辑
  • 治理:应用速率限制、隐私保护、合规检查
💡 核心优势

中间件让你无需修改核心 Agent 代码即可实现复杂的控制逻辑。 通过钩子函数在 Agent 执行的不同阶段注入行为, 实现模块化、可复用的扩展。

🏗️ 中间件架构和执行顺序

graph TB A[Agent 开始] --> B[before_agent 钩子] B --> C{开始循环} C --> D[before_model 钩子] D --> E[wrap_model_call 包装] E --> F[模型调用] F --> G[after_model 钩子] G --> H{需要工具?} H -->|是| I[wrap_tool_call 包装] I --> J[执行工具] J --> K[返回 ToolMessage] K --> D H -->|否| L{继续循环?} L -->|是| D L -->|否| M[after_agent 钩子] M --> N[Agent 结束] style B fill:#6366f1,color:#fff style D fill:#3b82f6,color:#fff style E fill:#10b981,color:#fff style G fill:#f59e0b,color:#fff style I fill:#8b5cf6,color:#fff style M fill:#ec4899,color:#fff

执行顺序规则

当使用多个中间件时,理解执行顺序至关重要:

钩子类型 执行顺序 说明
before_agent 从前到后 按中间件列表顺序执行
before_model 从前到后 每次模型调用前按顺序
wrap_model_call 嵌套包装 第一个中间件最外层
after_model 从后到前(反向) 与 before_model 相反
after_agent 从后到前(反向) 与 before_agent 相反
wrap_tool_call 嵌套包装 第一个中间件最外层
✅ 关键记忆点
  • before_* 钩子:从前到后执行
  • after_* 钩子:从后到前执行(反向)
  • wrap_* 钩子:嵌套包装,先注册的在最外层

🪝 钩子函数类型

LangChain 提供两种风格的钩子函数:

节点风格钩子(Node-Style Hooks)

在特定执行点顺序运行:

  • before_agent:Agent 启动前执行一次
  • before_model:每次模型调用前执行
  • after_model:每次模型响应后执行
  • after_agent:Agent 完成后执行一次

包装风格钩子(Wrap-Style Hooks)

拦截并控制执行流程:

  • wrap_model_call:包装每次模型调用
  • wrap_tool_call:包装每次工具调用
💡 关键区别

Wrap 风格钩子让你决定是否执行、执行多少次:

  • 零次:短路跳过(缓存命中)
  • 一次:正常执行
  • 多次:重试逻辑

🔨 自定义中间件开发

装饰器风格(简单快速)

Python 🟢 基础
"""
装饰器风格中间件示例
功能:记录模型调用日志
"""
from langchain.agents.middleware import before_model, after_model
from langchain.agents import create_agent

@before_model
def log_before(state, runtime):
    """模型调用前记录日志"""
    print(f"📤 准备调用模型,当前消息数: {len(state['messages'])}")
    return None  # 不修改状态

@after_model
def log_after(state, runtime):
    """模型响应后记录日志"""
    last_msg = state["messages"][-1]
    print(f"📥 模型响应: {last_msg.content[:50]}...")
    return None

# 创建 Agent
agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[log_before, log_after]
)

# 测试
response = agent.invoke({
    "messages": [{"role": "user", "content": "你好"}]
})

类风格(强大灵活)

Python 🟡 中级
"""
类风格中间件示例
功能:消息限制和日志记录
"""
from langchain.agents import AgentMiddleware, create_agent
from langchain.agents.middleware import hook_config
from langchain.messages import AIMessage

class MessageLimitMiddleware(AgentMiddleware):
    """限制对话消息数量的中间件"""

    def __init__(self, max_messages: int = 50):
        super().__init__()
        self.max_messages = max_messages

    @hook_config(can_jump_to=["end"])
    def before_model(self, state, runtime):
        """检查消息数量限制"""
        if len(state["messages"]) >= self.max_messages:
            return {
                "messages": [AIMessage(
                    f"对话已达到最大消息数限制({self.max_messages}条)。"
                )],
                "jump_to": "end"  # 提前结束
            }
        return None

    def after_model(self, state, runtime):
        """记录模型响应"""
        msg_count = len(state["messages"])
        print(f"📊 当前消息数: {msg_count}/{self.max_messages}")
        return None

# 创建 Agent
agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[MessageLimitMiddleware(max_messages=10)]
)

Wrap 风格钩子:重试逻辑

Python 🔴 高级
"""
Wrap 风格中间件:模型调用重试
功能:失败时自动重试,带指数退避
"""
from langchain.agents.middleware import wrap_model_call
from langchain.agents import create_agent
import time

@wrap_model_call
def retry_model(request, handler):
    """模型调用重试中间件"""
    max_retries = 3
    base_delay = 1.0

    for attempt in range(max_retries):
        try:
            # 调用实际的模型
            return handler(request)
        except Exception as e:
            if attempt == max_retries - 1:
                # 最后一次尝试仍失败,抛出异常
                raise

            # 计算退避时间
            delay = base_delay * (2 ** attempt)
            print(f"⚠️ 模型调用失败(尝试 {attempt + 1}/{max_retries}): {e}")
            print(f"🔄 等待 {delay} 秒后重试...")
            time.sleep(delay)

# 工具重试示例
@wrap_tool_call
def retry_tool(request, handler):
    """工具调用重试中间件"""
    max_retries = 2

    for attempt in range(max_retries):
        try:
            result = handler(request)
            print(f"✅ 工具 '{request.tool_call['name']}' 执行成功")
            return result
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"❌ 工具执行最终失败: {e}")
                raise
            print(f"🔄 工具重试 {attempt + 1}/{max_retries}")

agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[retry_model, retry_tool]
)

📦 内置中间件精选

LangChain 提供了 10+ 个生产就绪的内置中间件。 以下是最常用的几个:

1. SummarizationMiddleware - 对话摘要

自动总结长对话历史,防止超出 Token 限制。

Python 🟡 中级
"""
SummarizationMiddleware 示例
功能:对话超过 Token 限制时自动摘要
"""
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        SummarizationMiddleware(
            model="gpt-4o-mini",          # 用于生成摘要的模型
            trigger=("tokens", 4000),      # 超过 4000 tokens 触发
            keep=("messages", 20),         # 保留最近 20 条消息
        ),
    ],
)

# 多条件触发
agent2 = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        SummarizationMiddleware(
            model="gpt-4o-mini",
            trigger=[
                ("tokens", 3000),          # 3000 tokens 或
                ("messages", 50),          # 50 条消息
            ],
            keep=("messages", 10),
        ),
    ],
)

2. PIIMiddleware - 隐私保护

检测和处理个人身份信息(PII)。

Python 🟡 中级
"""
PIIMiddleware 示例
功能:检测和屏蔽敏感信息
"""
from langchain.agents import create_agent
from langchain.agents.middleware import PIIMiddleware
import re

# 内置 PII 类型:email, credit_card, ip, mac_address, url
agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        # 屏蔽邮箱
        PIIMiddleware(
            "email",
            strategy="redact",           # 替换为 [REDACTED_EMAIL]
            apply_to_input=True          # 检查用户输入
        ),
        # 掩码信用卡
        PIIMiddleware(
            "credit_card",
            strategy="mask",             # 部分掩码: ****-****-****-1234
            apply_to_input=True
        ),
    ],
)

# 自定义 PII 检测:API Key
agent_api = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        PIIMiddleware(
            "api_key",
            detector=r"sk-[a-zA-Z0-9]{32}",  # 正则表达式
            strategy="block",                 # 检测到直接阻止
            apply_to_input=True
        ),
    ],
)

# 自定义检测函数:身份证号
def detect_id_card(content: str):
    """检测身份证号"""
    matches = []
    pattern = r"\d{17}[\dX]"
    for match in re.finditer(pattern, content):
        matches.append({
            "text": match.group(0),
            "start": match.start(),
            "end": match.end(),
        })
    return matches

agent_id = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        PIIMiddleware(
            "id_card",
            detector=detect_id_card,
            strategy="hash",              # 替换为哈希值
        ),
    ],
)

3. HumanInTheLoopMiddleware - 人工审核

暂停执行等待人工批准、编辑或拒绝。

Python 🔴 高级
"""
HumanInTheLoopMiddleware 示例
功能:敏感操作前需要人工批准
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """发送邮件(敏感操作)"""
    return f"邮件已发送到 {to}"

@tool
def read_email(folder: str = "inbox") -> str:
    """读取邮件(安全操作)"""
    return f"读取 {folder} 中的邮件"

# 创建 Agent(需要 checkpointer)
agent = create_agent(
    model="gpt-4o",
    tools=[send_email, read_email],
    checkpointer=InMemorySaver(),  # 必需
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "send_email": {
                    "allowed_decisions": ["approve", "edit", "reject"],
                },
                "read_email": False,  # 不需要审核
            }
        ),
    ],
)

# 调用会在 send_email 前暂停,等待人工决策

4. ModelCallLimitMiddleware - 调用限制

Python 🟢 基础
"""
ModelCallLimitMiddleware 示例
功能:防止无限循环和过高成本
"""
from langchain.agents import create_agent
from langchain.agents.middleware import ModelCallLimitMiddleware

agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        ModelCallLimitMiddleware(
            thread_limit=20,    # 整个线程最多 20 次调用
            run_limit=5,        # 单次执行最多 5 次调用
            exit_behavior="end" # 达到限制时优雅结束
        ),
    ],
)

5. ToolCallLimitMiddleware - 工具调用限制

Python 🟡 中级
"""
ToolCallLimitMiddleware 示例
功能:限制工具调用次数
"""
from langchain.agents import create_agent
from langchain.agents.middleware import ToolCallLimitMiddleware

# 全局限制
global_limit = ToolCallLimitMiddleware(
    thread_limit=20,
    run_limit=10
)

# 特定工具限制
search_limit = ToolCallLimitMiddleware(
    tool_name="search",
    thread_limit=5,
    run_limit=3
)

agent = create_agent(
    model="gpt-4o",
    tools=[search_tool, calculator_tool],
    middleware=[global_limit, search_limit]
)

🚀 高级特性

自定义状态 Schema

Python 🔴 高级
"""
自定义状态 Schema 示例
功能:在状态中添加自定义字段进行跟踪
"""
from typing_extensions import NotRequired
from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import before_model, after_model, hook_config

# 定义自定义状态
class CustomState(AgentState):
    model_call_count: NotRequired[int]
    total_tokens: NotRequired[int]
    user_id: NotRequired[str]

@before_model(state_schema=CustomState, can_jump_to=["end"])
def check_call_limit(state: CustomState, runtime):
    """检查调用次数限制"""
    count = state.get("model_call_count", 0)
    if count >= 10:
        return {"jump_to": "end"}
    return None

@after_model(state_schema=CustomState)
def increment_counter(state: CustomState, runtime):
    """增加计数器"""
    current_count = state.get("model_call_count", 0)
    return {"model_call_count": current_count + 1}

agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[check_call_limit, increment_counter],
    state_schema=CustomState
)

动态模型选择

Python 🔴 高级
"""
动态模型选择示例
功能:根据对话长度自动切换模型
"""
from langchain.agents.middleware import wrap_model_call
from langchain.chat_models import init_chat_model

# 初始化两个模型
simple_model = init_chat_model("gpt-4o-mini")
complex_model = init_chat_model("gpt-4o")

@wrap_model_call
def dynamic_model_selector(request, handler):
    """根据消息数量选择模型"""
    message_count = len(request.messages)

    if message_count > 20:
        # 长对话使用高级模型
        selected_model = complex_model
        print(f"🔵 使用高级模型(消息数: {message_count})")
    else:
        # 短对话使用轻量模型
        selected_model = simple_model
        print(f"🟢 使用轻量模型(消息数: {message_count})")

    # 覆盖请求中的模型
    return handler(request.override(model=selected_model))

agent = create_agent(
    model="gpt-4o-mini",  # 默认模型
    tools=[],
    middleware=[dynamic_model_selector]
)

Agent 跳转控制

Python 🔴 高级
"""
Agent 跳转控制示例
功能:检测到违规内容时提前结束
"""
from langchain.agents.middleware import after_model, hook_config
from langchain.messages import AIMessage

@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked_content(state, runtime):
    """检查响应中是否包含被阻止的内容"""
    last_message = state["messages"][-1]

    # 检查是否包含违规标记
    if "BLOCKED" in last_message.content or "拒绝回答" in last_message.content:
        return {
            "messages": [AIMessage("检测到违规内容,对话已终止。")],
            "jump_to": "end"  # 跳转到结束节点
        }

    return None

# 可用的跳转目标:
# - 'end': 跳到结束
# - 'tools': 跳到工具节点
# - 'model': 跳到模型节点

🎯 完整示例:生产级 Agent

Python 🔴 高级
"""
生产级 Agent 完整示例
功能:组合多个中间件实现完整的监控、限制和保护
"""
from langchain.agents import create_agent
from langchain.agents.middleware import (
    SummarizationMiddleware,
    PIIMiddleware,
    ModelCallLimitMiddleware,
    ToolCallLimitMiddleware,
    before_agent,
    after_agent,
    before_model,
    after_model,
)
from langchain.tools import tool
from datetime import datetime

# ==================== 自定义中间件 ====================

@before_agent
def log_session_start(state, runtime):
    """记录会话开始"""
    print(f"\n{'='*60}")
    print(f"🚀 会话开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*60}\n")
    return None

@after_agent
def log_session_end(state, runtime):
    """记录会话结束"""
    message_count = len(state["messages"])
    print(f"\n{'='*60}")
    print(f"✅ 会话结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"📊 总消息数: {message_count}")
    print(f"{'='*60}\n")
    return None

@before_model
def log_model_call(state, runtime):
    """记录每次模型调用"""
    user_msgs = [m for m in state["messages"] if m.__class__.__name__ == "HumanMessage"]
    print(f"🤖 模型调用 #{len(user_msgs)}")
    return None

@after_model
def log_model_response(state, runtime):
    """记录模型响应"""
    last_msg = state["messages"][-1]
    usage = getattr(last_msg, "usage_metadata", None)
    if usage:
        print(f"📈 Token 使用: {usage.get('total_tokens', 'N/A')}")
    return None

# ==================== 定义工具 ====================

@tool
def search_database(query: str) -> str:
    """搜索数据库"""
    return f"搜索结果:{query}"

@tool
def send_notification(message: str) -> str:
    """发送通知"""
    return f"通知已发送:{message}"

# ==================== 创建生产级 Agent ====================

production_agent = create_agent(
    model="gpt-4o",
    tools=[search_database, send_notification],
    middleware=[
        # 1. 会话生命周期日志
        log_session_start,
        log_session_end,

        # 2. 模型调用日志
        log_model_call,
        log_model_response,

        # 3. 调用限制(防止无限循环和过高成本)
        ModelCallLimitMiddleware(
            thread_limit=30,
            run_limit=10,
            exit_behavior="end"
        ),

        # 4. 工具调用限制
        ToolCallLimitMiddleware(
            thread_limit=20,
            run_limit=5
        ),

        # 5. 隐私保护
        PIIMiddleware("email", strategy="redact", apply_to_input=True),
        PIIMiddleware("credit_card", strategy="mask", apply_to_input=True),

        # 6. 对话摘要(长对话管理)
        SummarizationMiddleware(
            model="gpt-4o-mini",
            trigger=("tokens", 5000),
            keep=("messages", 15)
        ),
    ],
    system_prompt="""你是一个专业的客服助手。

职责:
1. 回答用户问题
2. 搜索数据库查找信息
3. 发送通知

注意事项:
- 保护用户隐私
- 简洁准确地回答
- 必要时使用工具"""
)

# ==================== 测试 ====================

if __name__ == "__main__":
    result = production_agent.invoke({
        "messages": [
            {"role": "user", "content": "我的邮箱是 [email protected],帮我查询订单状态"}
        ]
    })

    print("\n最终响应:")
    print(result["messages"][-1].content)

✨ 最佳实践

1. 中间件职责单一

每个中间件应该只处理一个关注点。

Python
# ❌ 不好:一个中间件做太多事
@before_model
def do_everything(state, runtime):
    log_message(state)
    check_limits(state)
    validate_input(state)
    track_metrics(state)
    # ...

# ✅ 好:每个中间件职责单一
@before_model
def log_message(state, runtime):
    """仅负责日志记录"""
    # ...

@before_model
def check_limits(state, runtime):
    """仅负责限制检查"""
    # ...

2. 合理安排中间件顺序

将关键中间件放在前面。

Python
agent = create_agent(
    model="gpt-4o",
    tools=[],
    middleware=[
        # 1. 最优先:安全和限制
        PIIMiddleware(...),
        ModelCallLimitMiddleware(...),

        # 2. 其次:重试和降级
        ToolRetryMiddleware(...),
        ModelFallbackMiddleware(...),

        # 3. 最后:日志和监控
        logging_middleware,
    ]
)

3. 优雅处理错误

Python
@before_model
def safe_middleware(state, runtime):
    """安全的中间件:不会让 Agent 崩溃"""
    try:
        # 执行可能失败的操作
        risky_operation(state)
    except Exception as e:
        # 记录错误但不中断执行
        print(f"中间件错误: {e}")
        # 可选:返回降级行为
    return None

4. 选择合适的钩子类型

  • 节点风格:日志记录、验证、状态更新
  • Wrap 风格:重试、缓存、降级、条件执行

5. 充分测试中间件

Python
def test_message_limit_middleware():
    """测试消息限制中间件"""
    middleware = MessageLimitMiddleware(max_messages=5)

    # 创建测试状态
    test_state = {"messages": [Mock() for _ in range(6)]}
    test_runtime = Mock()

    # 测试限制触发
    result = middleware.before_model(test_state, test_runtime)

    assert result is not None
    assert "jump_to" in result
    assert result["jump_to"] == "end"

❓ 常见问题

Q1: 中间件的性能开销大吗?

中间件本身的开销很小。性能影响主要来自中间件内部的逻辑。 建议:

  • 避免在中间件中执行耗时操作
  • 使用异步操作处理 I/O 密集任务
  • 在生产环境禁用调试日志

Q2: 中间件可以修改工具列表吗?

可以!使用 wrap_model_call 钩子:

Python
@wrap_model_call
def filter_tools(request, handler):
    # 根据条件筛选工具
    filtered_tools = [t for t in request.tools if should_include(t)]
    return handler(request.override(tools=filtered_tools))

Q3: 如何在中间件之间共享数据?

使用自定义状态 Schema 或 runtime.context:

Python
# 方式 1:自定义状态
class SharedState(AgentState):
    shared_data: NotRequired[dict]

@before_model(state_schema=SharedState)
def middleware_1(state, runtime):
    return {"shared_data": {"key": "value"}}

@after_model(state_schema=SharedState)
def middleware_2(state, runtime):
    data = state.get("shared_data", {})
    print(data["key"])  # 访问共享数据

Q4: 中间件可以阻止模型调用吗?

可以,使用 jump_to 跳转或在 wrap 钩子中不调用 handler:

Python
# 方式 1:使用 jump_to
@before_model
@hook_config(can_jump_to=["end"])
def block_model(state, runtime):
    if should_block(state):
        return {"jump_to": "end"}

# 方式 2:wrap 钩子短路
@wrap_model_call
def cache_check(request, handler):
    cached = get_from_cache(request)
    if cached:
        return cached  # 不调用 handler,直接返回缓存
    return handler(request)

Q5: 装饰器和类风格如何选择?

  • 装饰器:快速原型、单个钩子、简单逻辑
  • 类风格:多个钩子、复杂配置、需要复用

🔗 参考资源