🤝 DeepAgents 人工介入
通过 Human-in-the-Loop(HITL)机制,在执行敏感工具操作前进行人工审批, 确保安全性、合规性和成本控制。
📚 什么是 Human-in-the-Loop?
Human-in-the-Loop(HITL,人工介入)是一种安全机制, 允许在 Agent 执行敏感工具操作前暂停执行, 等待人工审批、修改或拒绝。这对于以下场景至关重要:
- 危险操作:删除文件、修改数据库、发送邮件等不可逆操作
- 昂贵操作:调用付费 API、发起大量请求、触发外部服务
- 合规要求:金融交易、法律文档审批、医疗决策等需要人工确认
- 质量控制:确保输出符合预期标准
| 特性 | 说明 | 价值 |
|---|---|---|
| 执行前中断 | 工具执行前暂停,不是执行后 | 完全阻止不当操作 |
| 三种决策 | 批准、编辑、拒绝 | 灵活的审批控制 |
| 批量处理 | 多个工具调用一次中断 | 提高审批效率 |
| 状态持久化 | 基于 Checkpointer | 跨会话恢复 |
| 分层配置 | 主 Agent 和子 Agent 独立配置 | 精细化控制 |
执行"] Agent -->|工具调用| Check{需要
审批?} Check -->|否| Execute1["直接执行
工具"] Check -->|是| Interrupt["⏸️ 中断
暂停执行"] Interrupt --> Review["人工审查"] Review --> Decision{决策} Decision -->|批准| Execute2["✅ 执行
原始参数"] Decision -->|编辑| Execute3["✏️ 执行
修改后参数"] Decision -->|拒绝| Skip["❌ 跳过
不执行"] Execute1 --> Result["继续执行"] Execute2 --> Result Execute3 --> Result Skip --> Result Result --> Final["返回结果"] style Check fill:#f59e0b,color:#fff style Interrupt fill:#ef4444,color:#fff style Review fill:#3b82f6,color:#fff style Execute2 fill:#10b981,color:#fff style Execute3 fill:#8b5cf6,color:#fff style Skip fill:#6b7280,color:#fff
⚙️ interrupt_on 参数配置
通过 interrupt_on 参数指定哪些工具需要人工审批。
支持多种配置方式:
基础配置方式
"""
interrupt_on 基础配置
"""
from deepagents import create_deep_agent
# 方式 1:布尔值(推荐用于简单场景)
agent = create_deep_agent(
model="gpt-4o",
tools=[delete_file, read_file, send_email],
interrupt_on={
"delete_file": True, # 启用中断
"read_file": False, # 禁用中断
"send_email": True, # 启用中断
}
)
# 方式 2:字典形式(高级控制)
agent = create_deep_agent(
model="gpt-4o",
tools=[delete_file, send_email, modify_database],
interrupt_on={
# 允许所有三种决策
"delete_file": {
"allowed_decisions": ["approve", "edit", "reject"]
},
# 仅允许批准或拒绝,不允许编辑
"send_email": {
"allowed_decisions": ["approve", "reject"]
},
# 仅允许批准(强制执行,但需要确认)
"modify_database": {
"allowed_decisions": ["approve"]
},
}
)
配置字段说明
| 配置方式 | 说明 | 适用场景 |
|---|---|---|
True |
启用中断,允许所有决策类型 | 通用场景,最大灵活性 |
False |
禁用中断,工具直接执行 | 安全操作,无需审批 |
{"allowed_decisions": [...]} |
精确控制允许的决策类型 | 不同风险等级的操作 |
三种决策类型
| 决策类型 | 行为 | 使用场景 |
|---|---|---|
approve |
使用原始参数执行工具 | 参数正确,批准执行 |
edit |
修改工具参数后执行 | 参数需要调整(如邮件地址错误) |
reject |
完全跳过工具执行 | 操作不安全或不必要 |
🔄 完整 HITL 工作流
实现 Human-in-the-Loop 需要以下步骤:
delete_file() Note over Agent,Tool: ⏸️ 检测到需要审批 Agent->>Agent: 保存状态
(Checkpointer) Agent-->>User: 返回中断信号
__interrupt__ User->>Review: 展示待审批操作 Review->>Review: 审查参数
评估风险 Review->>User: 做出决策
(approve/edit/reject) User->>Agent: 恢复执行
Command(resume={...}) alt 批准 Agent->>Tool: ✅ 执行原始操作 Tool-->>Agent: 返回结果 else 编辑 Agent->>Tool: ✏️ 执行修改后操作 Tool-->>Agent: 返回结果 else 拒绝 Agent->>Agent: ❌ 跳过执行 end Agent-->>User: 返回最终结果
"""
完整的 Human-in-the-Loop 工作流
"""
import os
import uuid
from langchain.tools import tool
from deepagents import create_deep_agent
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
# 步骤 1: 定义需要审批的工具
@tool
def delete_file(path: str) -> str:
"""删除文件系统中的文件"""
# 实际会执行删除操作
import os
os.remove(path)
return f"已删除文件:{path}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""发送电子邮件"""
# 实际会调用邮件服务
return f"已发送邮件到 {to}"
# 步骤 2: 创建 Checkpointer(必需!)
checkpointer = MemorySaver()
# 步骤 3: 配置 Agent 的 interrupt_on
agent = create_deep_agent(
model="claude-sonnet-4-5-20250929",
tools=[delete_file, send_email],
interrupt_on={
"delete_file": True, # 需要审批
"send_email": True, # 需要审批
},
checkpointer=checkpointer, # 必需:状态持久化
system_prompt="你是文件管理助手,可以删除文件和发送通知邮件。"
)
# 步骤 4: 配置线程 ID(用于状态管理)
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 步骤 5: 调用 Agent
print("=== 步骤 1: 调用 Agent ===")
result = agent.invoke({
"messages": [{
"role": "user",
"content": "删除临时文件 /tmp/cache.txt 并发邮件通知管理员"
}]
}, config=config)
# 步骤 6: 检查是否发生中断
if result.get("__interrupt__"):
print("\n=== 步骤 2: 检测到中断 ===")
# 提取中断信息
interrupts = result["__interrupt__"][0].value
action_requests = interrupts["action_requests"]
review_configs = interrupts["review_configs"]
print(f"需要审批的操作数量: {len(action_requests)}")
# 步骤 7: 展示待审批操作
print("\n=== 步骤 3: 待审批操作 ===")
for i, action in enumerate(action_requests, 1):
print(f"\n操作 {i}:")
print(f" 工具名称: {action['name']}")
print(f" 工具参数: {action['args']}")
# 步骤 8: 人工决策(这里模拟用户输入)
print("\n=== 步骤 4: 人工决策 ===")
decisions = []
for action in action_requests:
if action['name'] == 'delete_file':
# 批准删除操作
decisions.append({"type": "approve"})
print(f"✅ 批准执行: {action['name']}")
elif action['name'] == 'send_email':
# 编辑邮件地址
decisions.append({
"type": "edit",
"edited_action": {
"name": "send_email",
"args": {
"to": "[email protected]", # 修改收件人
"subject": action['args']['subject'],
"body": action['args']['body']
}
}
})
print(f"✏️ 编辑后执行: {action['name']}")
# 步骤 9: 恢复执行(使用相同的 config!)
print("\n=== 步骤 5: 恢复执行 ===")
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config # ⚠️ 必须使用相同的 thread_id
)
print("\n=== 步骤 6: 执行完成 ===")
print(f"最终结果:\n{result['messages'][-1].content}")
else:
print("无需审批,直接执行完成")
print(f"结果:\n{result['messages'][-1].content}")
# 输出示例:
# === 步骤 1: 调用 Agent ===
# === 步骤 2: 检测到中断 ===
# 需要审批的操作数量: 2
#
# === 步骤 3: 待审批操作 ===
# 操作 1:
# 工具名称: delete_file
# 工具参数: {'path': '/tmp/cache.txt'}
#
# 操作 2:
# 工具名称: send_email
# 工具参数: {'to': '[email protected]', 'subject': '...', 'body': '...'}
#
# === 步骤 4: 人工决策 ===
# ✅ 批准执行: delete_file
# ✏️ 编辑后执行: send_email
#
# === 步骤 5: 恢复执行 ===
# === 步骤 6: 执行完成 ===
# 最终结果:
# 已成功删除文件并发送通知邮件。
🎯 三种决策类型详解
决策 1: Approve(批准)
使用原始参数执行工具,不做任何修改:
"""
决策类型 1: Approve
"""
# 批准执行,使用原始参数
decisions = [{"type": "approve"}]
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)
# Agent 会使用原始参数执行工具:
# delete_file(path="/tmp/cache.txt")
决策 2: Edit(编辑)
修改工具参数后执行,适用于参数需要调整的场景:
"""
决策类型 2: Edit
"""
# 编辑参数后执行
decisions = [{
"type": "edit",
"edited_action": {
"name": "send_email", # 必须指定工具名称
"args": { # 完整的新参数
"to": "[email protected]", # 修改后的收件人
"subject": "System Alert",
"body": "File deleted successfully."
}
}
}]
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)
# Agent 会使用修改后的参数执行工具:
# send_email(to="[email protected]", ...)
决策 3: Reject(拒绝)
完全跳过工具执行,工具不会被调用:
"""
决策类型 3: Reject
"""
# 拒绝执行,跳过工具调用
decisions = [{"type": "reject"}]
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)
# Agent 会跳过工具执行,不调用 delete_file()
# Agent 会知道操作被拒绝,可以继续执行其他任务
📦 多工具中断处理
当 Agent 需要调用多个需要审批的工具时, 它们会批量打包在一次中断中。 决策列表的顺序必须与 action_requests 的顺序一致。
"""
多工具中断处理
"""
import uuid
from langchain.tools import tool
from deepagents import create_deep_agent
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
@tool
def delete_file(path: str) -> str:
"""删除文件"""
return f"已删除:{path}"
@tool
def send_email(to: str, subject: str) -> str:
"""发送邮件"""
return f"已发送邮件到:{to}"
@tool
def backup_database() -> str:
"""备份数据库"""
return "数据库已备份"
# 配置 Agent
checkpointer = MemorySaver()
agent = create_deep_agent(
model="gpt-4o",
tools=[delete_file, send_email, backup_database],
interrupt_on={
"delete_file": True,
"send_email": True,
"backup_database": True,
},
checkpointer=checkpointer
)
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 调用 Agent(需要多个工具)
result = agent.invoke({
"messages": [{
"role": "user",
"content": "删除旧日志文件、备份数据库、然后发邮件通知"
}]
}, config=config)
# 检查中断
if result.get("__interrupt__"):
interrupts = result["__interrupt__"][0].value
action_requests = interrupts["action_requests"]
print(f"需要审批 {len(action_requests)} 个操作\n")
# 展示所有待审批操作
for i, action in enumerate(action_requests, 1):
print(f"操作 {i}: {action['name']}")
print(f" 参数: {action['args']}\n")
# ⚠️ 关键:决策顺序必须与 action_requests 顺序一致
decisions = [
{"type": "approve"}, # 第 1 个工具:delete_file
{"type": "reject"}, # 第 2 个工具:backup_database(拒绝)
{"type": "approve"}, # 第 3 个工具:send_email
]
# 恢复执行
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)
print("执行结果:")
print(result["messages"][-1].content)
# 输出示例:
# 需要审批 3 个操作
#
# 操作 1: delete_file
# 参数: {'path': '/var/log/old.log'}
#
# 操作 2: backup_database
# 参数: {}
#
# 操作 3: send_email
# 参数: {'to': '[email protected]', 'subject': 'Maintenance Complete'}
#
# 执行结果:
# 已删除旧日志文件,跳过了数据库备份,并发送了通知邮件。
- 顺序必须一致:decisions 列表的顺序必须与 action_requests 顺序一致
- 数量必须匹配:decisions 的长度必须等于 action_requests 的长度
- 不能遗漏:每个操作都必须有对应的决策
🎯 子代理中断配置
每个子代理可以覆盖主 Agent 的 interrupt_on 设置, 实现精细化的审批控制:
"""
子代理中断配置 - 分层控制
"""
from langchain.tools import tool
from deepagents import create_deep_agent
from langgraph.checkpoint.memory import MemorySaver
@tool
def delete_file(path: str) -> str:
"""删除文件"""
return f"已删除:{path}"
@tool
def read_file(path: str) -> str:
"""读取文件"""
with open(path, 'r') as f:
return f.read()
# 创建 Agent
checkpointer = MemorySaver()
agent = create_deep_agent(
model="gpt-4o",
tools=[delete_file, read_file],
# 主 Agent 的中断配置
interrupt_on={
"delete_file": True, # 主 Agent 中需要审批
"read_file": False, # 主 Agent 中无需审批
},
# 子代理配置
subagents=[{
"name": "file-manager",
"description": "管理文件的专门代理",
"tools": [delete_file, read_file],
"system_prompt": "你是文件管理专家。",
# 子代理的中断配置(覆盖主 Agent)
"interrupt_on": {
"delete_file": True, # 子代理中也需要审批
"read_file": True, # ⚠️ 子代理中需要审批(与主 Agent 不同)
}
}],
checkpointer=checkpointer
)
# 使用场景:
# 1. 主 Agent 直接调用 delete_file → 需要审批
# 2. 主 Agent 直接调用 read_file → 无需审批
# 3. 主 Agent 委派给 file-manager 子代理
# → 子代理调用 delete_file → 需要审批
# → 子代理调用 read_file → 需要审批(不同于主 Agent)
# 为什么子代理配置不同?
# - 子代理可能在更敏感的上下文中操作
# - 子代理可能处理更关键的数据
# - 不同的子代理有不同的审批策略
🛡️ 风险等级配置策略
根据操作的敏感程度和风险等级, 配置不同的审批策略:
"""
风险等级配置策略
"""
from langchain.tools import tool
from deepagents import create_deep_agent
from langgraph.checkpoint.memory import MemorySaver
# 定义不同风险等级的工具
@tool
def delete_production_data(table: str) -> str:
"""删除生产环境数据(高风险)"""
return f"已删除生产表:{table}"
@tool
def send_marketing_email(recipients: list, content: str) -> str:
"""发送营销邮件(中等风险)"""
return f"已发送邮件给 {len(recipients)} 个收件人"
@tool
def read_log_file(path: str) -> str:
"""读取日志文件(低风险)"""
return f"日志内容:..."
@tool
def update_cache(key: str, value: str) -> str:
"""更新缓存(无风险)"""
return f"缓存已更新:{key}"
# 风险等级配置
checkpointer = MemorySaver()
agent = create_deep_agent(
model="claude-sonnet-4-5-20250929",
tools=[
delete_production_data,
send_marketing_email,
read_log_file,
update_cache
],
interrupt_on={
# 🔴 高风险:完全控制(批准、编辑、拒绝)
"delete_production_data": {
"allowed_decisions": ["approve", "edit", "reject"]
},
# 🟡 中等风险:仅批准/拒绝,不允许编辑
"send_marketing_email": {
"allowed_decisions": ["approve", "reject"]
},
# 🟢 低风险:仅需确认(只能批准)
"read_log_file": {
"allowed_decisions": ["approve"]
},
# ⚪ 无风险:无需审批
"update_cache": False,
},
checkpointer=checkpointer,
system_prompt="""你是系统管理助手。执行操作时需要遵循严格的审批策略:
- 删除生产数据:高风险,必须人工审批
- 发送营销邮件:中等风险,需要确认
- 读取日志文件:低风险,仅需确认
- 更新缓存:无风险,自动执行
"""
)
# 配置表格
print("风险等级配置策略:\n")
print("| 工具 | 风险等级 | 允许的决策 | 说明")
print("|------------------------|---------|------------------------|--------")
print("| delete_production_data | 🔴 高 | approve, edit, reject | 完全控制")
print("| send_marketing_email | 🟡 中 | approve, reject | 仅批准/拒绝")
print("| read_log_file | 🟢 低 | approve | 仅需确认")
print("| update_cache | ⚪ 无 | - | 自动执行")
风险等级分类指南
| 风险等级 | 操作类型 | 推荐配置 | 示例 |
|---|---|---|---|
| 🔴 高风险 | 不可逆的破坏性操作 | approve, edit, reject | 删除数据、转账、发布代码 |
| 🟡 中等风险 | 可逆但需谨慎的操作 | approve, reject | 发送邮件、修改配置 |
| 🟢 低风险 | 需要确认但安全的操作 | approve | 读取敏感数据、查看日志 |
| ⚪ 无风险 | 完全安全的操作 | False(无需审批) | 更新缓存、记录日志 |
✨ HITL 最佳实践
1. 必须使用 Checkpointer
Human-in-the-Loop 功能必须配合 Checkpointer 使用, 用于在中断和恢复之间持久化 Agent 状态。
# ❌ 错误:没有 Checkpointer
agent = create_deep_agent(
interrupt_on={"delete_file": True}
# 缺少 checkpointer 参数
)
# 会导致运行时错误!
# ✅ 正确:包含 Checkpointer
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
agent = create_deep_agent(
interrupt_on={"delete_file": True},
checkpointer=checkpointer # 必需!
)
2. 线程 ID 一致性
恢复执行时必须使用相同的 thread_id, 否则无法恢复正确的状态。
# ❌ 错误:使用不同的 thread_id
config1 = {"configurable": {"thread_id": "thread-1"}}
result = agent.invoke({...}, config=config1)
# 中断后...
config2 = {"configurable": {"thread_id": "thread-2"}} # 不同的 ID!
result = agent.invoke(Command(resume={...}), config=config2) # 会失败
# ✅ 正确:使用相同的 thread_id
config = {"configurable": {"thread_id": "thread-1"}}
result = agent.invoke({...}, config=config)
# 中断后...
result = agent.invoke(Command(resume={...}), config=config) # 相同的 config
3. 决策顺序与数量
- 顺序一致:decisions 列表顺序必须与 action_requests 一致
- 数量匹配:每个操作都必须有对应的决策
- 类型验证:确保决策类型在 allowed_decisions 范围内
4. 编辑参数的完整性
使用 edit 决策时,必须提供完整的工具参数:
# ❌ 错误:参数不完整
decisions = [{
"type": "edit",
"edited_action": {
"name": "send_email",
"args": {"to": "[email protected]"} # 缺少 subject 和 body
}
}]
# ✅ 正确:完整参数
decisions = [{
"type": "edit",
"edited_action": {
"name": "send_email",
"args": {
"to": "[email protected]",
"subject": "Updated Subject",
"body": "Updated content"
}
}
}]
5. 审批 UI 集成
在生产环境中,应该构建友好的审批界面:
- 清晰展示待审批操作的工具名称、参数、风险等级
- 提供批准、编辑、拒绝按钮
- 编辑模式允许用户修改参数
- 记录审批历史和决策原因
- 支持批量审批多个操作
❓ 常见问题
Q1: 为什么必须使用 Checkpointer?
Checkpointer 用于持久化 Agent 状态。 中断发生时,Agent 的执行状态(包括消息历史、工具调用请求等) 需要保存下来,以便恢复时能继续执行。没有 Checkpointer, 状态会丢失,无法恢复。
Q2: 可以跨会话恢复中断的 Agent 吗?
可以,如果使用持久化的 Checkpointer (如 PostgresSaver、RedisSaver),状态会保存到数据库中。 只要使用相同的 thread_id,即使在不同的进程或服务器实例中, 也可以恢复执行。
# 使用持久化 Checkpointer
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver(connection_string="postgresql://...")
agent = create_deep_agent(
interrupt_on={"delete_file": True},
checkpointer=checkpointer
)
# 进程 1:触发中断
config = {"configurable": {"thread_id": "user-123-task-456"}}
result = agent.invoke({...}, config=config)
# ... 进程退出 ...
# 进程 2:恢复执行(可能在不同服务器)
config = {"configurable": {"thread_id": "user-123-task-456"}} # 相同 ID
result = agent.invoke(Command(resume={...}), config=config) # 成功恢复
Q3: 如何在中断后取消整个任务?
拒绝所有工具调用即可:
# 拒绝所有操作
if result.get("__interrupt__"):
interrupts = result["__interrupt__"][0].value
action_requests = interrupts["action_requests"]
# 全部拒绝
decisions = [{"type": "reject"} for _ in action_requests]
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)
# Agent 会知道所有操作被拒绝,可以生成相应的消息
Q4: 可以为不同用户配置不同的审批策略吗?
可以,通过动态配置 interrupt_on:
def create_user_agent(user_role: str):
"""根据用户角色创建 Agent"""
if user_role == "admin":
# 管理员:无需审批
interrupt_on = {}
elif user_role == "operator":
# 操作员:仅高风险操作需审批
interrupt_on = {"delete_file": True}
else:
# 普通用户:所有敏感操作需审批
interrupt_on = {
"delete_file": True,
"send_email": True,
"modify_database": True
}
return create_deep_agent(
interrupt_on=interrupt_on,
checkpointer=checkpointer
)
# 使用
admin_agent = create_user_agent("admin")
user_agent = create_user_agent("user")
Q5: 中断会影响 Agent 性能吗?
中断本身不影响性能。但要注意:
- Checkpointer 开销:持久化 Checkpointer(如 PostgresSaver)有 I/O 开销
- 人工响应时间:等待审批期间,Agent 处于暂停状态
- 建议:合理配置哪些工具需要审批,避免过度中断
Q6: 如何实现超时自动拒绝?
可以在应用层实现超时逻辑:
import time
# 触发中断
result = agent.invoke({...}, config=config)
if result.get("__interrupt__"):
# 记录中断时间
interrupt_time = time.time()
timeout_seconds = 300 # 5 分钟超时
# 等待用户决策(应该在异步环境中实现)
# ... 等待用户输入 ...
# 检查是否超时
if time.time() - interrupt_time > timeout_seconds:
# 超时,自动拒绝
decisions = [{"type": "reject"} for _ in action_requests]
else:
# 使用用户决策
decisions = user_decisions
result = agent.invoke(
Command(resume={"decisions": decisions}),
config=config
)