📚 什么是 Human-in-the-Loop?

Human-in-the-Loop (HITL) 是一种让 AI Agent 在执行关键操作前 暂停并等待人工决策的机制。这确保了敏感操作(如删除数据、发送邮件、执行支付) 必须经过人工审核才能执行。

💡 核心价值
  • 安全性:防止 AI 执行不可逆的危险操作
  • 可控性:人类保留最终决策权
  • 合规性:满足某些业务需要人工审批的要求
  • 灵活性:可以修改 AI 提议的参数再执行

典型应用场景

场景 需要审核的操作 风险
数据管理 删除文件、清空数据库 数据丢失不可恢复
财务系统 发起支付、转账 资金损失
通信系统 发送邮件、短信、公告 错误信息传播
基础设施 重启服务器、修改配置 服务中断
代码仓库 合并代码、发布版本 生产环境故障

🏗️ Human-in-the-Loop 架构

graph TB A[用户请求] --> B[Agent 推理] B --> C[生成工具调用] C --> D{需要人工审核?} D -->|否| E[直接执行工具] E --> F[返回结果] D -->|是| G[暂停执行] G --> H[创建 interrupt] H --> I[等待人工决策] I --> J{决策类型?} J -->|✅ Approve| K[执行原始工具调用] J -->|✏️ Edit| L[修改参数后执行] J -->|❌ Reject| M[生成拒绝消息] K --> N[继续 Agent 执行] L --> N M --> N N --> F style D fill:#f59e0b,color:#fff style G fill:#ef4444,color:#fff style I fill:#3b82f6,color:#fff style K fill:#10b981,color:#fff style L fill:#8b5cf6,color:#fff style M fill:#ef4444,color:#fff

三种决策类型

决策类型 符号 说明 使用场景
Approve 批准执行,不修改参数 操作正确,直接执行
Edit ✏️ 修改参数后执行 操作方向正确,但参数需调整
Reject 拒绝执行,并提供反馈 操作不安全或不合理

⚙️ HumanInTheLoopMiddleware 配置

使用 HumanInTheLoopMiddleware 为 Agent 添加人工审核能力。

⚠️ 必需条件

必须配置 Checkpointer!Human-in-the-Loop 依赖持久化状态, 才能在暂停后恢复执行。生产环境推荐使用 PostgresSaver

Python 🟢 基础
"""
HumanInTheLoopMiddleware 基础配置
功能:为敏感工具添加人工审核
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool

@tool
def write_file(filename: str, content: str) -> str:
    """写入文件(敏感操作)"""
    with open(filename, "w") as f:
        f.write(content)
    return f"文件 {filename} 写入成功"

@tool
def read_file(filename: str) -> str:
    """读取文件(安全操作)"""
    with open(filename, "r") as f:
        return f.read()

@tool
def delete_file(filename: str) -> str:
    """删除文件(危险操作)"""
    import os
    os.remove(filename)
    return f"文件 {filename} 已删除"

# 创建带 HITL 的 Agent
agent = create_agent(
    model="gpt-4o",
    tools=[write_file, read_file, delete_file],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "write_file": True,      # 写文件需要审核
                "delete_file": True,     # 删除文件需要审核
                "read_file": False,      # 读文件无需审核
            },
            description_prefix="工具执行等待审批"
        )
    ],
    checkpointer=InMemorySaver(),  # 必需!
)

# 配置 thread_id(用于持久化)
config = {"configurable": {"thread_id": "user_001"}}

配置参数详解

参数 类型 说明
interrupt_on dict 指定哪些工具需要审核(True/False/配置对象)
allowed_decisions list 允许的决策类型(默认全部允许)
description str/callable 审批请求的描述文本
description_prefix str 描述文本的默认前缀

🔄 完整执行工作流

步骤 1:运行直到中断

Python 🟡 中级
"""
步骤 1:运行 Agent 直到遇到需要审核的操作
"""
from langgraph.types import Command

config = {"configurable": {"thread_id": "user_001"}}

# 调用 Agent
result = agent.invoke(
    {
        "messages": [
            {"role": "user", "content": "删除 old_data.txt 文件"}
        ]
    },
    config=config
)

# 检查是否有中断
if "__interrupt__" in result:
    print("⏸️ Agent 已暂停,等待人工决策")
    interrupt_info = result["__interrupt__"]

    # 查看中断详情
    action_requests = interrupt_info[0]["value"]["action_requests"]
    for req in action_requests:
        print(f"\n工具: {req['name']}")
        print(f"参数: {req['args']}")
        print(f"描述: {req.get('description', '无')}")
else:
    # 无需审核,直接执行完成
    print("✅ 执行完成(无需审核)")
    print(result["messages"][-1].content)

步骤 2:做出决策并恢复

决策 1:批准执行

Python 🟡 中级
"""
批准执行:同意 Agent 的提议
"""
from langgraph.types import Command

# 恢复执行并批准
result = agent.invoke(
    Command(
        resume={
            "decisions": [
                {"type": "approve"}  # ✅ 批准执行
            ]
        }
    ),
    config=config  # 使用相同的 thread_id
)

print("✅ 操作已执行")
print(result["messages"][-1].content)

决策 2:修改参数后执行

Python 🔴 高级
"""
编辑执行:修改参数后再执行
"""
from langgraph.types import Command

# 假设 Agent 想删除 important_file.txt
# 我们修改为删除 backup_file.txt

result = agent.invoke(
    Command(
        resume={
            "decisions": [
                {
                    "type": "edit",  # ✏️ 编辑模式
                    "edited_action": {
                        "name": "delete_file",  # 工具名称(可选修改)
                        "args": {
                            "filename": "backup_file.txt"  # 修改参数
                        }
                    }
                }
            ]
        }
    ),
    config=config
)

print("✏️ 操作已修改并执行")
print(result["messages"][-1].content)
⚠️ Edit 决策注意事项

修改参数时应保守。大幅修改可能导致模型重新评估策略, 可能多次执行工具或产生意外行为。建议:

  • 只修改必要的参数值
  • 不要更改工具的核心逻辑
  • 修改后的参数应符合原始意图

决策 3:拒绝并提供反馈

Python 🟡 中级
"""
拒绝执行:不同意操作并提供反馈
"""
from langgraph.types import Command

result = agent.invoke(
    Command(
        resume={
            "decisions": [
                {
                    "type": "reject",  # ❌ 拒绝
                    "message": "不能删除该文件,因为它包含重要数据。请先备份后再操作。"
                }
            ]
        }
    ),
    config=config
)

# 拒绝消息会被加入对话历史
# Agent 会根据反馈重新思考
print("❌ 操作已拒绝")
print(result["messages"][-1].content)

🔢 处理多个工具审批

当 Agent 同时调用多个需要审核的工具时,需要为每个工具提供决策。

Python 🔴 高级
"""
多工具审批示例
功能:同时处理多个工具调用的审批
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """发送邮件"""
    return f"邮件已发送到 {to}"

@tool
def delete_records(table: str, condition: str) -> str:
    """删除数据库记录"""
    return f"已从 {table} 删除符合 {condition} 的记录"

@tool
def create_backup(source: str) -> str:
    """创建备份"""
    return f"已备份 {source}"

agent = create_agent(
    model="gpt-4o",
    tools=[send_email, delete_records, create_backup],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "send_email": True,
                "delete_records": True,
                "create_backup": True,
            }
        )
    ],
    checkpointer=InMemorySaver(),
)

config = {"configurable": {"thread_id": "multi_approval"}}

# Agent 可能同时调用多个工具
result = agent.invoke(
    {
        "messages": [
            {"role": "user", "content": "备份数据库,删除旧记录,并发邮件通知管理员"}
        ]
    },
    config=config
)

if "__interrupt__" in result:
    action_requests = result["__interrupt__"][0]["value"]["action_requests"]

    print(f"需要审批 {len(action_requests)} 个操作:\n")
    for i, req in enumerate(action_requests):
        print(f"{i+1}. {req['name']}: {req['args']}")

    # 为每个工具提供决策(顺序必须匹配)
    result = agent.invoke(
        Command(
            resume={
                "decisions": [
                    {"type": "approve"},  # 批准第1个工具
                    {
                        "type": "edit",    # 修改第2个工具
                        "edited_action": {
                            "name": "delete_records",
                            "args": {
                                "table": "logs",
                                "condition": "created_at < '2024-01-01'"  # 修改条件
                            }
                        }
                    },
                    {
                        "type": "reject",  # 拒绝第3个工具
                        "message": "请先确认备份成功后再发送邮件"
                    }
                ]
            }
        ),
        config=config
    )

print("\n决策已提交,Agent 继续执行")
✅ 多工具决策顺序

决策列表的顺序必须与 action_requests 的顺序一致。 第 1 个决策对应第 1 个工具调用,第 2 个决策对应第 2 个工具调用,以此类推。

🌊 流式输出与 Human-in-the-Loop

结合流式输出,可以实时监控 Agent 执行过程,并在中断时获得通知。

Python 🔴 高级
"""
流式输出 + HITL 示例
功能:实时显示执行过程,遇到中断时暂停
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command

@tool
def execute_sql(query: str) -> str:
    """执行 SQL 查询"""
    return f"SQL 执行成功:{query}"

agent = create_agent(
    model="gpt-4o",
    tools=[execute_sql],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={"execute_sql": True}
        )
    ],
    checkpointer=InMemorySaver(),
)

config = {"configurable": {"thread_id": "streaming_hitl"}}

print("开始执行...\n")

# 使用流式模式
interrupted = False

for mode, chunk in agent.stream(
    {
        "messages": [
            {"role": "user", "content": "删除 users 表中所有未激活的用户"}
        ]
    },
    config=config,
    stream_mode=["updates", "messages"]
):
    if mode == "messages":
        # 显示 Token 流
        token, metadata = chunk
        if token:
            print(token, end="", flush=True)

    elif mode == "updates":
        # 检查是否有中断
        for step, data in chunk.items():
            if "__interrupt__" in data:
                interrupted = True
                print("\n\n⏸️ 检测到需要审批的操作!")

                action_requests = data["__interrupt__"][0]["value"]["action_requests"]
                for req in action_requests:
                    print(f"\n工具: {req['name']}")
                    print(f"SQL: {req['args']['query']}")

                # 模拟人工决策
                print("\n等待人工审批...")
                break

if interrupted:
    # 继续执行(这里简化为自动批准)
    print("✅ 批准执行\n")

    for mode, chunk in agent.stream(
        Command(resume={"decisions": [{"type": "approve"}]}),
        config=config,
        stream_mode=["messages"]
    ):
        token, metadata = chunk
        if token:
            print(token, end="", flush=True)

print("\n\n执行完成")

⚙️ 高级配置选项

限制允许的决策类型

Python 🟡 中级
"""
限制决策类型示例
功能:某些工具只允许批准或拒绝,不允许编辑
"""
from langchain.agents.middleware import HumanInTheLoopMiddleware

middleware = HumanInTheLoopMiddleware(
    interrupt_on={
        # 写文件:允许所有决策类型
        "write_file": True,

        # 删除文件:只允许批准或拒绝,不允许编辑
        "delete_file": {
            "allowed_decisions": ["approve", "reject"]
        },

        # 发送邮件:只允许批准(必须执行或手动取消)
        "send_email": {
            "allowed_decisions": ["approve"]
        },
    }
)

自定义描述文本

Python 🔴 高级
"""
自定义描述文本示例
功能:为审批请求提供更详细的说明
"""
from langchain.agents.middleware import HumanInTheLoopMiddleware

def custom_description(action_request):
    """动态生成描述文本"""
    tool_name = action_request["name"]
    args = action_request["args"]

    if tool_name == "delete_file":
        filename = args.get("filename", "未知文件")
        return f"⚠️ 危险操作:即将删除文件 '{filename}',此操作不可撤销!"

    elif tool_name == "send_email":
        to = args.get("to", "未知收件人")
        subject = args.get("subject", "无主题")
        return f"📧 邮件审批:发送给 {to},主题:{subject}"

    else:
        return f"操作审批:{tool_name}"

middleware = HumanInTheLoopMiddleware(
    interrupt_on={
        "delete_file": True,
        "send_email": True,
    },
    description=custom_description  # 使用自定义函数
)

🎯 完整示例:敏感操作审批系统

Python 🔴 高级 - 完整项目
"""
敏感操作审批系统完整示例
功能:模拟真实的审批工作流
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command
import json

# ==================== 工具定义 ====================

@tool
def query_database(sql: str) -> str:
    """查询数据库(安全操作)"""
    return f"查询结果:[{'user1', 'user2', 'user3'}]"

@tool
def update_database(sql: str) -> str:
    """更新数据库(需审批)"""
    return f"数据库更新成功:{sql}"

@tool
def delete_from_database(sql: str) -> str:
    """删除数据库记录(危险操作)"""
    return f"删除成功:{sql}"

@tool
def send_notification(recipients: list[str], message: str) -> str:
    """发送通知(需审批)"""
    return f"通知已发送给 {len(recipients)} 人"

# ==================== 自定义描述 ====================

def approval_description(action_request):
    """生成审批描述"""
    tool_name = action_request["name"]
    args = action_request["args"]

    descriptions = {
        "update_database": f"🔸 数据库更新\nSQL: {args.get('sql', 'N/A')}",
        "delete_from_database": f"⚠️ 数据库删除(危险)\nSQL: {args.get('sql', 'N/A')}",
        "send_notification": f"📧 批量通知\n收件人: {len(args.get('recipients', []))} 人\n内容: {args.get('message', 'N/A')[:50]}...",
    }

    return descriptions.get(tool_name, f"操作: {tool_name}")

# ==================== 创建 Agent ====================

approval_agent = create_agent(
    model="gpt-4o",
    tools=[
        query_database,
        update_database,
        delete_from_database,
        send_notification
    ],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "query_database": False,           # 查询不需要审批
                "update_database": True,           # 更新需要审批
                "delete_from_database": {          # 删除只允许批准/拒绝
                    "allowed_decisions": ["approve", "reject"]
                },
                "send_notification": True,         # 发送通知需要审批
            },
            description=approval_description
        )
    ],
    checkpointer=InMemorySaver(),
    system_prompt="""你是数据库管理助手。

职责:
1. 查询数据库
2. 更新或删除记录(需要人工审批)
3. 发送通知

注意:
- 删除操作特别危险,需谨慎
- 发送通知前确认收件人和内容"""
)

# ==================== 审批工作流 ====================

def execute_with_approval(user_request: str, thread_id: str):
    """执行带审批的请求"""
    config = {"configurable": {"thread_id": thread_id}}

    print("\n" + "="*60)
    print(f"用户请求: {user_request}")
    print("="*60 + "\n")

    # 第一次调用:运行直到中断
    result = approval_agent.invoke(
        {"messages": [{"role": "user", "content": user_request}]},
        config=config
    )

    # 检查是否需要审批
    if "__interrupt__" not in result:
        print("✅ 执行完成(无需审批)")
        print(result["messages"][-1].content)
        return

    # 显示审批请求
    print("⏸️ 需要人工审批\n")
    action_requests = result["__interrupt__"][0]["value"]["action_requests"]

    decisions = []
    for i, req in enumerate(action_requests):
        print(f"审批请求 #{i+1}:")
        print(f"  工具: {req['name']}")
        print(f"  参数: {json.dumps(req['args'], ensure_ascii=False, indent=4)}")
        print(f"  说明: {req.get('description', '无')}")

        # 模拟人工决策(实际应用中这里是交互式输入)
        if req['name'] == "delete_from_database":
            # 删除操作:拒绝
            decision = {
                "type": "reject",
                "message": "删除操作过于危险,请先备份数据"
            }
            print("  决策: ❌ 拒绝\n")
        elif req['name'] == "send_notification":
            # 发送通知:批准
            decision = {"type": "approve"}
            print("  决策: ✅ 批准\n")
        else:
            # 其他操作:批准
            decision = {"type": "approve"}
            print("  决策: ✅ 批准\n")

        decisions.append(decision)

    # 提交决策并继续执行
    print("提交审批决策...\n")
    result = approval_agent.invoke(
        Command(resume={"decisions": decisions}),
        config=config
    )

    print("✅ 执行完成")
    print(result["messages"][-1].content)

# ==================== 测试场景 ====================

if __name__ == "__main__":
    # 场景 1:查询操作(无需审批)
    execute_with_approval(
        "查询所有活跃用户",
        thread_id="scenario_1"
    )

    # 场景 2:更新操作(需要审批)
    execute_with_approval(
        "更新所有未激活用户的状态为已激活",
        thread_id="scenario_2"
    )

    # 场景 3:删除操作(会被拒绝)
    execute_with_approval(
        "删除所有测试数据,并发送通知给管理员",
        thread_id="scenario_3"
    )

❓ 常见问题

Q1: 忘记配置 Checkpointer 会怎样?

会抛出错误。Human-in-the-Loop 依赖持久化状态,必须配置 Checkpointer。

Python
# ❌ 错误:缺少 checkpointer
agent = create_agent(
    model="gpt-4o",
    tools=[...],
    middleware=[HumanInTheLoopMiddleware(...)]
    # 缺少 checkpointer!
)

# ✅ 正确
agent = create_agent(
    model="gpt-4o",
    tools=[...],
    middleware=[HumanInTheLoopMiddleware(...)],
    checkpointer=InMemorySaver()  # 必需
)

Q2: thread_id 必须一致吗?

是的。暂停和恢复必须使用相同的 thread_id, 否则无法恢复到正确的状态。

Q3: 可以超时自动批准/拒绝吗?

LangChain 本身不提供超时机制,但可以自己实现:

Python
import time
from threading import Thread

def wait_for_approval_with_timeout(agent, config, timeout=300):
    """等待审批,超时自动拒绝"""
    decision = {"type": None}

    def wait_input():
        # 实际应用中这里是获取用户输入
        user_input = input("批准(y)/拒绝(n): ")
        decision["type"] = "approve" if user_input == "y" else "reject"

    thread = Thread(target=wait_input)
    thread.start()
    thread.join(timeout=timeout)

    if decision["type"] is None:
        print("⏰ 审批超时,自动拒绝")
        decision["type"] = "reject"
        decision["message"] = "审批超时"

    return decision

Q4: 如何记录审批历史?

Python
# 记录审批日志
import logging
from datetime import datetime

logger = logging.getLogger("approval_system")

def log_approval_decision(action_request, decision, user_id):
    """记录审批决策"""
    log_entry = {
        "timestamp": datetime.now().isoformat(),
        "user_id": user_id,
        "tool": action_request["name"],
        "args": action_request["args"],
        "decision": decision["type"],
        "message": decision.get("message", ""),
    }

    logger.info(f"Approval: {log_entry}")

    # 存储到数据库
    # db.save_approval_log(log_entry)

Q5: 可以有多级审批吗?

可以通过自定义中间件实现多级审批。例如:

  • 第一级:操作者审批
  • 第二级:管理员审批
  • 第三级:高级管理员审批

需要在状态中记录当前审批级别,并根据级别触发不同的审批流程。

✨ 最佳实践

1. 明确审批范围

  • 只对真正危险的操作要求审批:过度审批会降低效率
  • 区分操作风险等级:高风险操作只允许 approve/reject
  • 文档化审批策略:让团队清楚哪些操作需要审批

2. 提供清晰的上下文

  • 使用 description 参数提供详细说明
  • 显示操作的影响范围(如受影响的记录数)
  • 包含必要的警告信息

3. 审批决策指导

情况 建议决策
操作完全正确 Approve
方向对但参数需微调 Edit(保守修改)
操作不安全或不合理 Reject + 详细反馈
需要更多信息才能决策 Reject + 要求补充信息

4. 生产环境配置

Python
# 生产环境推荐配置
from langgraph.checkpoint.postgres import PostgresSaver

DB_URI = "postgresql://user:pass@localhost:5432/langchain"

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()

    agent = create_agent(
        model="gpt-4o",
        tools=[...],
        middleware=[
            HumanInTheLoopMiddleware(
                interrupt_on={...},
                description=custom_description_function,
            ),
        ],
        checkpointer=checkpointer,  # 生产级持久化
    )

📖 参考资源