👤 Human-in-the-Loop(人工介入)
掌握 LangChain 1.0 的 Human-in-the-Loop 机制,实现敏感操作的人工审核, 构建安全可控的 AI Agent 系统。
📚 什么是 Human-in-the-Loop?
Human-in-the-Loop (HITL) 是一种让 AI Agent 在执行关键操作前 暂停并等待人工决策的机制。这确保了敏感操作(如删除数据、发送邮件、执行支付) 必须经过人工审核才能执行。
- 安全性:防止 AI 执行不可逆的危险操作
- 可控性:人类保留最终决策权
- 合规性:满足某些业务需要人工审批的要求
- 灵活性:可以修改 AI 提议的参数再执行
典型应用场景
| 场景 | 需要审核的操作 | 风险 |
|---|---|---|
| 数据管理 | 删除文件、清空数据库 | 数据丢失不可恢复 |
| 财务系统 | 发起支付、转账 | 资金损失 |
| 通信系统 | 发送邮件、短信、公告 | 错误信息传播 |
| 基础设施 | 重启服务器、修改配置 | 服务中断 |
| 代码仓库 | 合并代码、发布版本 | 生产环境故障 |
🏗️ Human-in-the-Loop 架构
三种决策类型
| 决策类型 | 符号 | 说明 | 使用场景 |
|---|---|---|---|
| Approve | ✅ | 批准执行,不修改参数 | 操作正确,直接执行 |
| Edit | ✏️ | 修改参数后执行 | 操作方向正确,但参数需调整 |
| Reject | ❌ | 拒绝执行,并提供反馈 | 操作不安全或不合理 |
⚙️ HumanInTheLoopMiddleware 配置
使用 HumanInTheLoopMiddleware 为 Agent 添加人工审核能力。
必须配置 Checkpointer!Human-in-the-Loop 依赖持久化状态,
才能在暂停后恢复执行。生产环境推荐使用 PostgresSaver。
"""
HumanInTheLoopMiddleware 基础配置
功能:为敏感工具添加人工审核
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
@tool
def write_file(filename: str, content: str) -> str:
"""写入文件(敏感操作)"""
with open(filename, "w") as f:
f.write(content)
return f"文件 {filename} 写入成功"
@tool
def read_file(filename: str) -> str:
"""读取文件(安全操作)"""
with open(filename, "r") as f:
return f.read()
@tool
def delete_file(filename: str) -> str:
"""删除文件(危险操作)"""
import os
os.remove(filename)
return f"文件 {filename} 已删除"
# 创建带 HITL 的 Agent
agent = create_agent(
model="gpt-4o",
tools=[write_file, read_file, delete_file],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"write_file": True, # 写文件需要审核
"delete_file": True, # 删除文件需要审核
"read_file": False, # 读文件无需审核
},
description_prefix="工具执行等待审批"
)
],
checkpointer=InMemorySaver(), # 必需!
)
# 配置 thread_id(用于持久化)
config = {"configurable": {"thread_id": "user_001"}}
配置参数详解
| 参数 | 类型 | 说明 |
|---|---|---|
interrupt_on |
dict | 指定哪些工具需要审核(True/False/配置对象) |
allowed_decisions |
list | 允许的决策类型(默认全部允许) |
description |
str/callable | 审批请求的描述文本 |
description_prefix |
str | 描述文本的默认前缀 |
🔄 完整执行工作流
步骤 1:运行直到中断
"""
步骤 1:运行 Agent 直到遇到需要审核的操作
"""
from langgraph.types import Command
config = {"configurable": {"thread_id": "user_001"}}
# 调用 Agent
result = agent.invoke(
{
"messages": [
{"role": "user", "content": "删除 old_data.txt 文件"}
]
},
config=config
)
# 检查是否有中断
if "__interrupt__" in result:
print("⏸️ Agent 已暂停,等待人工决策")
interrupt_info = result["__interrupt__"]
# 查看中断详情
action_requests = interrupt_info[0]["value"]["action_requests"]
for req in action_requests:
print(f"\n工具: {req['name']}")
print(f"参数: {req['args']}")
print(f"描述: {req.get('description', '无')}")
else:
# 无需审核,直接执行完成
print("✅ 执行完成(无需审核)")
print(result["messages"][-1].content)
步骤 2:做出决策并恢复
决策 1:批准执行
"""
批准执行:同意 Agent 的提议
"""
from langgraph.types import Command
# 恢复执行并批准
result = agent.invoke(
Command(
resume={
"decisions": [
{"type": "approve"} # ✅ 批准执行
]
}
),
config=config # 使用相同的 thread_id
)
print("✅ 操作已执行")
print(result["messages"][-1].content)
决策 2:修改参数后执行
"""
编辑执行:修改参数后再执行
"""
from langgraph.types import Command
# 假设 Agent 想删除 important_file.txt
# 我们修改为删除 backup_file.txt
result = agent.invoke(
Command(
resume={
"decisions": [
{
"type": "edit", # ✏️ 编辑模式
"edited_action": {
"name": "delete_file", # 工具名称(可选修改)
"args": {
"filename": "backup_file.txt" # 修改参数
}
}
}
]
}
),
config=config
)
print("✏️ 操作已修改并执行")
print(result["messages"][-1].content)
修改参数时应保守。大幅修改可能导致模型重新评估策略, 可能多次执行工具或产生意外行为。建议:
- 只修改必要的参数值
- 不要更改工具的核心逻辑
- 修改后的参数应符合原始意图
决策 3:拒绝并提供反馈
"""
拒绝执行:不同意操作并提供反馈
"""
from langgraph.types import Command
result = agent.invoke(
Command(
resume={
"decisions": [
{
"type": "reject", # ❌ 拒绝
"message": "不能删除该文件,因为它包含重要数据。请先备份后再操作。"
}
]
}
),
config=config
)
# 拒绝消息会被加入对话历史
# Agent 会根据反馈重新思考
print("❌ 操作已拒绝")
print(result["messages"][-1].content)
🔢 处理多个工具审批
当 Agent 同时调用多个需要审核的工具时,需要为每个工具提供决策。
"""
多工具审批示例
功能:同时处理多个工具调用的审批
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送邮件"""
return f"邮件已发送到 {to}"
@tool
def delete_records(table: str, condition: str) -> str:
"""删除数据库记录"""
return f"已从 {table} 删除符合 {condition} 的记录"
@tool
def create_backup(source: str) -> str:
"""创建备份"""
return f"已备份 {source}"
agent = create_agent(
model="gpt-4o",
tools=[send_email, delete_records, create_backup],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"send_email": True,
"delete_records": True,
"create_backup": True,
}
)
],
checkpointer=InMemorySaver(),
)
config = {"configurable": {"thread_id": "multi_approval"}}
# Agent 可能同时调用多个工具
result = agent.invoke(
{
"messages": [
{"role": "user", "content": "备份数据库,删除旧记录,并发邮件通知管理员"}
]
},
config=config
)
if "__interrupt__" in result:
action_requests = result["__interrupt__"][0]["value"]["action_requests"]
print(f"需要审批 {len(action_requests)} 个操作:\n")
for i, req in enumerate(action_requests):
print(f"{i+1}. {req['name']}: {req['args']}")
# 为每个工具提供决策(顺序必须匹配)
result = agent.invoke(
Command(
resume={
"decisions": [
{"type": "approve"}, # 批准第1个工具
{
"type": "edit", # 修改第2个工具
"edited_action": {
"name": "delete_records",
"args": {
"table": "logs",
"condition": "created_at < '2024-01-01'" # 修改条件
}
}
},
{
"type": "reject", # 拒绝第3个工具
"message": "请先确认备份成功后再发送邮件"
}
]
}
),
config=config
)
print("\n决策已提交,Agent 继续执行")
决策列表的顺序必须与 action_requests 的顺序一致。
第 1 个决策对应第 1 个工具调用,第 2 个决策对应第 2 个工具调用,以此类推。
🌊 流式输出与 Human-in-the-Loop
结合流式输出,可以实时监控 Agent 执行过程,并在中断时获得通知。
"""
流式输出 + HITL 示例
功能:实时显示执行过程,遇到中断时暂停
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command
@tool
def execute_sql(query: str) -> str:
"""执行 SQL 查询"""
return f"SQL 执行成功:{query}"
agent = create_agent(
model="gpt-4o",
tools=[execute_sql],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={"execute_sql": True}
)
],
checkpointer=InMemorySaver(),
)
config = {"configurable": {"thread_id": "streaming_hitl"}}
print("开始执行...\n")
# 使用流式模式
interrupted = False
for mode, chunk in agent.stream(
{
"messages": [
{"role": "user", "content": "删除 users 表中所有未激活的用户"}
]
},
config=config,
stream_mode=["updates", "messages"]
):
if mode == "messages":
# 显示 Token 流
token, metadata = chunk
if token:
print(token, end="", flush=True)
elif mode == "updates":
# 检查是否有中断
for step, data in chunk.items():
if "__interrupt__" in data:
interrupted = True
print("\n\n⏸️ 检测到需要审批的操作!")
action_requests = data["__interrupt__"][0]["value"]["action_requests"]
for req in action_requests:
print(f"\n工具: {req['name']}")
print(f"SQL: {req['args']['query']}")
# 模拟人工决策
print("\n等待人工审批...")
break
if interrupted:
# 继续执行(这里简化为自动批准)
print("✅ 批准执行\n")
for mode, chunk in agent.stream(
Command(resume={"decisions": [{"type": "approve"}]}),
config=config,
stream_mode=["messages"]
):
token, metadata = chunk
if token:
print(token, end="", flush=True)
print("\n\n执行完成")
⚙️ 高级配置选项
限制允许的决策类型
"""
限制决策类型示例
功能:某些工具只允许批准或拒绝,不允许编辑
"""
from langchain.agents.middleware import HumanInTheLoopMiddleware
middleware = HumanInTheLoopMiddleware(
interrupt_on={
# 写文件:允许所有决策类型
"write_file": True,
# 删除文件:只允许批准或拒绝,不允许编辑
"delete_file": {
"allowed_decisions": ["approve", "reject"]
},
# 发送邮件:只允许批准(必须执行或手动取消)
"send_email": {
"allowed_decisions": ["approve"]
},
}
)
自定义描述文本
"""
自定义描述文本示例
功能:为审批请求提供更详细的说明
"""
from langchain.agents.middleware import HumanInTheLoopMiddleware
def custom_description(action_request):
"""动态生成描述文本"""
tool_name = action_request["name"]
args = action_request["args"]
if tool_name == "delete_file":
filename = args.get("filename", "未知文件")
return f"⚠️ 危险操作:即将删除文件 '{filename}',此操作不可撤销!"
elif tool_name == "send_email":
to = args.get("to", "未知收件人")
subject = args.get("subject", "无主题")
return f"📧 邮件审批:发送给 {to},主题:{subject}"
else:
return f"操作审批:{tool_name}"
middleware = HumanInTheLoopMiddleware(
interrupt_on={
"delete_file": True,
"send_email": True,
},
description=custom_description # 使用自定义函数
)
🎯 完整示例:敏感操作审批系统
"""
敏感操作审批系统完整示例
功能:模拟真实的审批工作流
"""
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command
import json
# ==================== 工具定义 ====================
@tool
def query_database(sql: str) -> str:
"""查询数据库(安全操作)"""
return f"查询结果:[{'user1', 'user2', 'user3'}]"
@tool
def update_database(sql: str) -> str:
"""更新数据库(需审批)"""
return f"数据库更新成功:{sql}"
@tool
def delete_from_database(sql: str) -> str:
"""删除数据库记录(危险操作)"""
return f"删除成功:{sql}"
@tool
def send_notification(recipients: list[str], message: str) -> str:
"""发送通知(需审批)"""
return f"通知已发送给 {len(recipients)} 人"
# ==================== 自定义描述 ====================
def approval_description(action_request):
"""生成审批描述"""
tool_name = action_request["name"]
args = action_request["args"]
descriptions = {
"update_database": f"🔸 数据库更新\nSQL: {args.get('sql', 'N/A')}",
"delete_from_database": f"⚠️ 数据库删除(危险)\nSQL: {args.get('sql', 'N/A')}",
"send_notification": f"📧 批量通知\n收件人: {len(args.get('recipients', []))} 人\n内容: {args.get('message', 'N/A')[:50]}...",
}
return descriptions.get(tool_name, f"操作: {tool_name}")
# ==================== 创建 Agent ====================
approval_agent = create_agent(
model="gpt-4o",
tools=[
query_database,
update_database,
delete_from_database,
send_notification
],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={
"query_database": False, # 查询不需要审批
"update_database": True, # 更新需要审批
"delete_from_database": { # 删除只允许批准/拒绝
"allowed_decisions": ["approve", "reject"]
},
"send_notification": True, # 发送通知需要审批
},
description=approval_description
)
],
checkpointer=InMemorySaver(),
system_prompt="""你是数据库管理助手。
职责:
1. 查询数据库
2. 更新或删除记录(需要人工审批)
3. 发送通知
注意:
- 删除操作特别危险,需谨慎
- 发送通知前确认收件人和内容"""
)
# ==================== 审批工作流 ====================
def execute_with_approval(user_request: str, thread_id: str):
"""执行带审批的请求"""
config = {"configurable": {"thread_id": thread_id}}
print("\n" + "="*60)
print(f"用户请求: {user_request}")
print("="*60 + "\n")
# 第一次调用:运行直到中断
result = approval_agent.invoke(
{"messages": [{"role": "user", "content": user_request}]},
config=config
)
# 检查是否需要审批
if "__interrupt__" not in result:
print("✅ 执行完成(无需审批)")
print(result["messages"][-1].content)
return
# 显示审批请求
print("⏸️ 需要人工审批\n")
action_requests = result["__interrupt__"][0]["value"]["action_requests"]
decisions = []
for i, req in enumerate(action_requests):
print(f"审批请求 #{i+1}:")
print(f" 工具: {req['name']}")
print(f" 参数: {json.dumps(req['args'], ensure_ascii=False, indent=4)}")
print(f" 说明: {req.get('description', '无')}")
# 模拟人工决策(实际应用中这里是交互式输入)
if req['name'] == "delete_from_database":
# 删除操作:拒绝
decision = {
"type": "reject",
"message": "删除操作过于危险,请先备份数据"
}
print(" 决策: ❌ 拒绝\n")
elif req['name'] == "send_notification":
# 发送通知:批准
decision = {"type": "approve"}
print(" 决策: ✅ 批准\n")
else:
# 其他操作:批准
decision = {"type": "approve"}
print(" 决策: ✅ 批准\n")
decisions.append(decision)
# 提交决策并继续执行
print("提交审批决策...\n")
result = approval_agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)
print("✅ 执行完成")
print(result["messages"][-1].content)
# ==================== 测试场景 ====================
if __name__ == "__main__":
# 场景 1:查询操作(无需审批)
execute_with_approval(
"查询所有活跃用户",
thread_id="scenario_1"
)
# 场景 2:更新操作(需要审批)
execute_with_approval(
"更新所有未激活用户的状态为已激活",
thread_id="scenario_2"
)
# 场景 3:删除操作(会被拒绝)
execute_with_approval(
"删除所有测试数据,并发送通知给管理员",
thread_id="scenario_3"
)
❓ 常见问题
Q1: 忘记配置 Checkpointer 会怎样?
会抛出错误。Human-in-the-Loop 依赖持久化状态,必须配置 Checkpointer。
# ❌ 错误:缺少 checkpointer
agent = create_agent(
model="gpt-4o",
tools=[...],
middleware=[HumanInTheLoopMiddleware(...)]
# 缺少 checkpointer!
)
# ✅ 正确
agent = create_agent(
model="gpt-4o",
tools=[...],
middleware=[HumanInTheLoopMiddleware(...)],
checkpointer=InMemorySaver() # 必需
)
Q2: thread_id 必须一致吗?
是的。暂停和恢复必须使用相同的 thread_id,
否则无法恢复到正确的状态。
Q3: 可以超时自动批准/拒绝吗?
LangChain 本身不提供超时机制,但可以自己实现:
import time
from threading import Thread
def wait_for_approval_with_timeout(agent, config, timeout=300):
"""等待审批,超时自动拒绝"""
decision = {"type": None}
def wait_input():
# 实际应用中这里是获取用户输入
user_input = input("批准(y)/拒绝(n): ")
decision["type"] = "approve" if user_input == "y" else "reject"
thread = Thread(target=wait_input)
thread.start()
thread.join(timeout=timeout)
if decision["type"] is None:
print("⏰ 审批超时,自动拒绝")
decision["type"] = "reject"
decision["message"] = "审批超时"
return decision
Q4: 如何记录审批历史?
# 记录审批日志
import logging
from datetime import datetime
logger = logging.getLogger("approval_system")
def log_approval_decision(action_request, decision, user_id):
"""记录审批决策"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"tool": action_request["name"],
"args": action_request["args"],
"decision": decision["type"],
"message": decision.get("message", ""),
}
logger.info(f"Approval: {log_entry}")
# 存储到数据库
# db.save_approval_log(log_entry)
Q5: 可以有多级审批吗?
可以通过自定义中间件实现多级审批。例如:
- 第一级:操作者审批
- 第二级:管理员审批
- 第三级:高级管理员审批
需要在状态中记录当前审批级别,并根据级别触发不同的审批流程。
✨ 最佳实践
1. 明确审批范围
- 只对真正危险的操作要求审批:过度审批会降低效率
- 区分操作风险等级:高风险操作只允许 approve/reject
- 文档化审批策略:让团队清楚哪些操作需要审批
2. 提供清晰的上下文
- 使用
description参数提供详细说明 - 显示操作的影响范围(如受影响的记录数)
- 包含必要的警告信息
3. 审批决策指导
| 情况 | 建议决策 |
|---|---|
| 操作完全正确 | Approve |
| 方向对但参数需微调 | Edit(保守修改) |
| 操作不安全或不合理 | Reject + 详细反馈 |
| 需要更多信息才能决策 | Reject + 要求补充信息 |
4. 生产环境配置
# 生产环境推荐配置
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:pass@localhost:5432/langchain"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
agent = create_agent(
model="gpt-4o",
tools=[...],
middleware=[
HumanInTheLoopMiddleware(
interrupt_on={...},
description=custom_description_function,
),
],
checkpointer=checkpointer, # 生产级持久化
)