📚 什么是 Multi-Agent Subagents?

Multi-Agent Subagents(多代理子代理)是一种架构模式, 通过创建多个独立的专业化代理,每个代理专注于特定领域或任务, 通过协作机制共同完成复杂的业务流程。

💡 核心特征
特性 DeepAgents Subagents Multi-Agent Subagents
主要目的 上下文隔离(单 Agent 内) 专业化分工(多 Agent 协作)
代理独立性 从属于主 Agent 每个代理可完全独立
调用方式 主 Agent 通过 task 工具调用 LangGraph 节点和边调用
状态管理 共享主 Agent 状态 独立状态或共享 Graph 状态
工作流控制 主 Agent 决定 Graph 流程编排
典型场景 研究助手、数据处理 客服系统、工作流引擎
graph TB subgraph DeepAgents["DeepAgents 模式"] Main["主 Agent"] -->|"task 工具"| Sub1["子代理 1
上下文隔离"] Main -->|"task 工具"| Sub2["子代理 2
上下文隔离"] Sub1 -->|"摘要返回"| Main Sub2 -->|"摘要返回"| Main end subgraph MultiAgent["Multi-Agent 模式"] Start["开始"] --> Agent1["Agent 1
研究专员"] Agent1 --> Agent2["Agent 2
分析专员"] Agent2 --> Agent3["Agent 3
报告专员"] Agent3 --> End["结束"] end style Main fill:#3b82f6,color:#fff style Sub1 fill:#10b981,color:#fff style Sub2 fill:#10b981,color:#fff style Agent1 fill:#f59e0b,color:#fff style Agent2 fill:#8b5cf6,color:#fff style Agent3 fill:#ef4444,color:#fff

何时使用 Multi-Agent Subagents?

✅ 适合使用场景
  • 明确的角色分工:不同代理负责不同领域(客服、技术、销售)
  • 顺序工作流:任务需要按特定顺序执行(研究 → 分析 → 报告)
  • 并行处理:多个任务可以同时进行
  • 独立测试:每个代理可以独立开发和测试
  • 可扩展性:容易添加新的代理节点
❌ 不适合使用场景
  • 简单单步任务:不需要多个代理的开销
  • 高度耦合的任务:所有步骤必须共享详细上下文
  • 实时响应要求:多代理调用会增加延迟

🔧 在 LangGraph 中创建 Subagents

Multi-Agent Subagents 基于 LangGraph 的状态图实现, 每个子代理是图中的一个节点。

基础模式:顺序执行

Python 🟢 基础
"""
顺序执行模式 - 多个子代理按顺序协作
"""
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain.tools import tool

# 定义共享状态
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    research_data: str
    analysis_result: str
    final_report: str

# 工具定义
@tool
def search_internet(query: str) -> str:
    """搜索网络信息"""
    # 实际会调用搜索 API
    return f"关于 {query} 的搜索结果..."

@tool
def analyze_data(data: str) -> str:
    """分析数据"""
    # 实际会执行数据分析
    return f"数据分析结果:{data[:50]}..."

# 子代理 1: 研究专员
def research_agent(state: AgentState):
    """研究专员:收集信息"""
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_tools = llm.bind_tools([search_internet])

    messages = state["messages"]
    response = llm_with_tools.invoke(messages)

    return {
        "messages": [response],
        "research_data": "已收集的研究数据..."
    }

# 子代理 2: 分析专员
def analysis_agent(state: AgentState):
    """分析专员:分析数据"""
    llm = ChatOpenAI(model="claude-sonnet-4-5")
    llm_with_tools = llm.bind_tools([analyze_data])

    # 基于研究数据进行分析
    prompt = f"分析以下研究数据:{state['research_data']}"
    response = llm.invoke(prompt)

    return {
        "messages": [response],
        "analysis_result": "分析结果:关键发现..."
    }

# 子代理 3: 报告专员
def report_agent(state: AgentState):
    """报告专员:生成报告"""
    llm = ChatOpenAI(model="gpt-4o")

    # 基于分析结果生成报告
    prompt = f"基于以下分析生成报告:{state['analysis_result']}"
    response = llm.invoke(prompt)

    return {
        "messages": [response],
        "final_report": "最终报告内容..."
    }

# 构建工作流
workflow = StateGraph(AgentState)

# 添加子代理节点
workflow.add_node("researcher", research_agent)
workflow.add_node("analyst", analysis_agent)
workflow.add_node("reporter", report_agent)

# 定义执行顺序(顺序工作流)
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "reporter")
workflow.add_edge("reporter", END)

# 编译
app = workflow.compile()

# 使用
from langchain_core.messages import HumanMessage

result = app.invoke({
    "messages": [HumanMessage(content="研究 AI 的最新进展")]
})

print("最终报告:", result["final_report"])

# 执行流程:
# 用户请求 → 研究专员 → 分析专员 → 报告专员 → 返回结果
# 每个代理专注于自己的任务,数据通过状态传递

并行执行模式

Python 🟡 中级
"""
并行执行模式 - 多个子代理同时工作
"""
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

# 定义状态
class ParallelState(TypedDict):
    messages: Annotated[list, add_messages]
    web_results: str
    database_results: str
    api_results: str
    combined_results: str

# 并行子代理 1: 网络搜索
def web_search_agent(state: ParallelState):
    """网络搜索专员"""
    llm = ChatOpenAI(model="gpt-4o")
    # 搜索网络
    return {
        "web_results": "网络搜索结果..."
    }

# 并行子代理 2: 数据库查询
def database_agent(state: ParallelState):
    """数据库查询专员"""
    llm = ChatOpenAI(model="gpt-4o")
    # 查询数据库
    return {
        "database_results": "数据库查询结果..."
    }

# 并行子代理 3: API 调用
def api_agent(state: ParallelState):
    """API 调用专员"""
    llm = ChatOpenAI(model="gpt-4o")
    # 调用外部 API
    return {
        "api_results": "API 调用结果..."
    }

# 汇总代理
def aggregator_agent(state: ParallelState):
    """汇总专员:整合所有并行结果"""
    llm = ChatOpenAI(model="claude-sonnet-4-5")

    # 整合所有来源的数据
    combined = f"""
    网络: {state['web_results']}
    数据库: {state['database_results']}
    API: {state['api_results']}
    """

    response = llm.invoke(f"整合以下信息:{combined}")

    return {
        "combined_results": response.content,
        "messages": [response]
    }

# 构建并行工作流
workflow = StateGraph(ParallelState)

# 添加并行节点
workflow.add_node("web_search", web_search_agent)
workflow.add_node("database", database_agent)
workflow.add_node("api", api_agent)
workflow.add_node("aggregator", aggregator_agent)

# 并行执行:从 START 分发到三个代理
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "database")
workflow.add_edge(START, "api")

# 所有并行任务完成后汇总
workflow.add_edge("web_search", "aggregator")
workflow.add_edge("database", "aggregator")
workflow.add_edge("api", "aggregator")

workflow.add_edge("aggregator", END)

app = workflow.compile()

# 使用
result = app.invoke({
    "messages": [HumanMessage(content="收集用户数据")]
})

print("汇总结果:", result["combined_results"])

# 执行流程:
#           ┌─→ 网络搜索 ─┐
# 用户请求 ─┼─→ 数据库查询 ─┼→ 汇总 → 返回
#           └─→ API 调用 ─┘
# 三个代理并行执行,提高效率

条件路由模式

Python 🔴 高级
"""
条件路由模式 - 根据状态动态选择子代理
"""
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# 定义状态
class ConditionalState(TypedDict):
    messages: Annotated[list, add_messages]
    user_type: str  # "new" or "existing"
    issue_type: str  # "technical", "billing", "general"
    resolution: str

# 分类代理
def classifier_agent(state: ConditionalState):
    """分类代理:判断用户类型和问题类型"""
    llm = ChatOpenAI(model="gpt-4o")

    prompt = f"""分析用户请求并分类:
    用户消息: {state['messages'][-1].content}

    返回 JSON 格式:
    {{"user_type": "new" 或 "existing", "issue_type": "technical" 或 "billing" 或 "general"}}
    """

    response = llm.invoke(prompt)

    # 解析分类结果
    import json
    classification = json.loads(response.content)

    return {
        "user_type": classification["user_type"],
        "issue_type": classification["issue_type"]
    }

# 新用户代理
def new_user_agent(state: ConditionalState):
    """新用户专员:引导和注册"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke("欢迎新用户,引导注册流程")
    return {
        "resolution": "新用户已引导",
        "messages": [response]
    }

# 技术支持代理
def technical_agent(state: ConditionalState):
    """技术支持专员"""
    llm = ChatOpenAI(model="claude-sonnet-4-5")
    response = llm.invoke("解决技术问题")
    return {
        "resolution": "技术问题已解决",
        "messages": [response]
    }

# 账单代理
def billing_agent(state: ConditionalState):
    """账单专员"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke("处理账单问题")
    return {
        "resolution": "账单问题已处理",
        "messages": [response]
    }

# 通用代理
def general_agent(state: ConditionalState):
    """通用客服专员"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke("处理一般问题")
    return {
        "resolution": "一般问题已处理",
        "messages": [response]
    }

# 路由决策函数
def route_by_user_type(state: ConditionalState) -> Literal["new_user", "route_by_issue"]:
    """根据用户类型路由"""
    if state["user_type"] == "new":
        return "new_user"
    else:
        return "route_by_issue"

def route_by_issue_type(state: ConditionalState) -> Literal["technical", "billing", "general"]:
    """根据问题类型路由"""
    return state["issue_type"]

# 构建条件路由工作流
workflow = StateGraph(ConditionalState)

# 添加所有节点
workflow.add_node("classifier", classifier_agent)
workflow.add_node("new_user", new_user_agent)
workflow.add_node("technical", technical_agent)
workflow.add_node("billing", billing_agent)
workflow.add_node("general", general_agent)

# 添加条件路由
workflow.add_edge(START, "classifier")

# 第一层条件:用户类型
workflow.add_conditional_edges(
    "classifier",
    route_by_user_type,
    {
        "new_user": "new_user",
        "route_by_issue": "classifier"  # 继续到第二层路由
    }
)

# 第二层条件:问题类型(仅现有用户)
workflow.add_conditional_edges(
    "classifier",
    route_by_issue_type,
    {
        "technical": "technical",
        "billing": "billing",
        "general": "general"
    }
)

# 所有路径结束
workflow.add_edge("new_user", END)
workflow.add_edge("technical", END)
workflow.add_edge("billing", END)
workflow.add_edge("general", END)

app = workflow.compile()

# 测试不同场景
test_cases = [
    "我是新用户,如何注册?",           # → new_user
    "我的软件崩溃了",                   # → technical
    "账单有错误",                       # → billing
    "你们的营业时间?",                 # → general
]

for test in test_cases:
    result = app.invoke({"messages": [HumanMessage(content=test)]})
    print(f"请求: {test}")
    print(f"解决: {result['resolution']}\n")

# 执行流程:
#                    ┌→ 新用户专员
# 用户请求 → 分类器 ─┼→ 技术专员
#                    ├→ 账单专员
#                    └→ 通用专员
# 根据分类结果动态路由到不同代理

🗄️ 状态管理和共享

Multi-Agent 系统中的状态管理是协作的关键, 需要在共享状态代理独立性之间找到平衡。

graph TB subgraph Shared["共享状态模式"] S1["Agent 1"] -->|"写入 state.data1"| State["共享 State"] S2["Agent 2"] -->|"读取 state.data1
写入 state.data2"| State S3["Agent 3"] -->|"读取 state.data1, data2
生成最终结果"| State end subgraph Isolated["隔离状态模式"] I1["Agent 1
独立状态"] -->|"仅传递结果"| I2["Agent 2
独立状态"] I2 -->|"仅传递结果"| I3["Agent 3
独立状态"] end style State fill:#3b82f6,color:#fff style S1 fill:#10b981,color:#fff style S2 fill:#f59e0b,color:#fff style S3 fill:#ef4444,color:#fff style I1 fill:#10b981,color:#fff style I2 fill:#f59e0b,color:#fff style I3 fill:#ef4444,color:#fff

状态共享策略

策略 优点 缺点 适用场景
全局共享状态 信息完全透明,易于访问 耦合度高,状态可能混乱 简单工作流,代理需要协同
最小共享状态 低耦合,清晰边界 需要显式传递数据 复杂系统,代理独立性强
分层状态 灵活性高,按需共享 设计复杂 大型系统,多层级代理
事件驱动 松耦合,可扩展 调试困难 异步系统,事件响应

最小共享状态示例

Python 🟡 中级
"""
最小共享状态 - 仅共享必要信息
"""
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END

# 定义最小共享状态
class MinimalState(TypedDict):
    # 必须共享的信息
    messages: Annotated[list, add_messages]
    user_id: str

    # 每个阶段的输出(可选)
    stage1_output: str | None
    stage2_output: str | None
    final_output: str | None

# Agent 1: 仅写入 stage1_output
def agent1(state: MinimalState):
    """第一阶段处理"""
    # 只能读取 messages 和 user_id
    # 不依赖其他代理的输出
    return {
        "stage1_output": "阶段 1 结果..."
    }

# Agent 2: 读取 stage1_output,写入 stage2_output
def agent2(state: MinimalState):
    """第二阶段处理"""
    # 只依赖 stage1_output
    input_data = state["stage1_output"]
    return {
        "stage2_output": f"基于 {input_data} 的阶段 2 结果..."
    }

# Agent 3: 读取 stage2_output,写入 final_output
def agent3(state: MinimalState):
    """第三阶段处理"""
    # 只依赖 stage2_output
    input_data = state["stage2_output"]
    return {
        "final_output": f"基于 {input_data} 的最终结果"
    }

# 构建工作流
workflow = StateGraph(MinimalState)
workflow.add_node("stage1", agent1)
workflow.add_node("stage2", agent2)
workflow.add_node("stage3", agent3)

workflow.add_edge(START, "stage1")
workflow.add_edge("stage1", "stage2")
workflow.add_edge("stage2", "stage3")
workflow.add_edge("stage3", END)

app = workflow.compile()

# 优势:
# - 清晰的数据流向
# - 每个代理只访问必要的数据
# - 易于测试和调试
# - 降低耦合度

🎯 完整实战示例

构建一个内容创作流水线,包含研究、写作、编辑、SEO 优化四个专业化子代理。

Python 🔴 高级 - 生产级示例
"""
完整实战示例 - 内容创作流水线
"""
import os
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain.tools import tool

# ========== 1. 定义工具 ==========

@tool
def search_web(query: str, max_results: int = 5) -> str:
    """搜索网络信息"""
    # 实际会调用 Tavily 或其他搜索 API
    return f"关于 '{query}' 的搜索结果:[结果 1], [结果 2], [结果 3]"

@tool
def check_grammar(text: str) -> dict:
    """语法检查"""
    # 实际会调用语法检查 API
    return {"errors": 2, "suggestions": ["建议 1", "建议 2"]}

@tool
def analyze_seo(text: str) -> dict:
    """SEO 分析"""
    # 实际会分析关键词密度、可读性等
    return {
        "score": 85,
        "keywords": ["AI", "机器学习"],
        "suggestions": ["增加内链", "优化标题"]
    }

# ========== 2. 定义状态 ==========

class ContentState(TypedDict):
    messages: Annotated[list, add_messages]
    topic: str
    target_audience: str

    # 各阶段输出
    research_notes: str
    draft_content: str
    edited_content: str
    seo_optimized_content: str

    # 质量控制
    grammar_check: dict | None
    seo_score: dict | None
    needs_revision: bool

# ========== 3. 定义子代理 ==========

def research_agent(state: ContentState):
    """研究代理:收集背景资料"""
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_tools = llm.bind_tools([search_web])

    prompt = f"""
    研究主题:{state['topic']}
    目标受众:{state['target_audience']}

    任务:
    1. 搜索相关信息
    2. 识别关键要点
    3. 整理成结构化笔记

    保持笔记简洁(< 500 字)
    """

    response = llm_with_tools.invoke(prompt)

    return {
        "research_notes": "研究笔记:核心观点 1、2、3...",
        "messages": [response]
    }

def writer_agent(state: ContentState):
    """写作代理:创作内容"""
    llm = ChatAnthropic(model="claude-sonnet-4-5")

    prompt = f"""
    基于以下研究笔记创作文章:
    {state['research_notes']}

    主题:{state['topic']}
    受众:{state['target_audience']}

    要求:
    1. 结构清晰(引言-正文-结论)
    2. 语言流畅
    3. 引用研究发现
    4. 字数 800-1000 字
    """

    response = llm.invoke(prompt)

    return {
        "draft_content": response.content,
        "messages": [response]
    }

def editor_agent(state: ContentState):
    """编辑代理:审核和优化"""
    llm = ChatAnthropic(model="claude-sonnet-4-5")
    llm_with_tools = llm.bind_tools([check_grammar])

    prompt = f"""
    审核以下草稿:
    {state['draft_content']}

    任务:
    1. 检查语法和拼写
    2. 优化语言表达
    3. 确保逻辑连贯
    4. 改进可读性

    返回修订后的内容
    """

    response = llm_with_tools.invoke(prompt)

    # 模拟语法检查
    grammar_result = {"errors": 0, "suggestions": []}

    return {
        "edited_content": response.content,
        "grammar_check": grammar_result,
        "messages": [response]
    }

def seo_agent(state: ContentState):
    """SEO 代理:搜索引擎优化"""
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_tools = llm.bind_tools([analyze_seo])

    prompt = f"""
    优化 SEO:
    {state['edited_content']}

    主题:{state['topic']}

    任务:
    1. 优化关键词密度
    2. 改进标题和子标题
    3. 添加元描述
    4. 确保可读性

    返回 SEO 优化后的内容
    """

    response = llm_with_tools.invoke(prompt)

    # 模拟 SEO 分析
    seo_result = {
        "score": 88,
        "keywords": ["AI", "自动化"],
        "suggestions": []
    }

    return {
        "seo_optimized_content": response.content,
        "seo_score": seo_result,
        "needs_revision": seo_result["score"] < 80,
        "messages": [response]
    }

def quality_check_agent(state: ContentState):
    """质量检查代理:最终审核"""
    llm = ChatOpenAI(model="gpt-4o")

    prompt = f"""
    最终质量检查:

    SEO 评分:{state['seo_score']['score']}
    语法错误:{state['grammar_check']['errors']}

    内容:
    {state['seo_optimized_content'][:200]}...

    判断是否需要修订(是/否)
    """

    response = llm.invoke(prompt)

    # 根据评分决定是否需要修订
    needs_revision = state['seo_score']['score'] < 80 or state['grammar_check']['errors'] > 2

    return {
        "needs_revision": needs_revision,
        "messages": [response]
    }

# ========== 4. 构建工作流 ==========

workflow = StateGraph(ContentState)

# 添加所有节点
workflow.add_node("researcher", research_agent)
workflow.add_node("writer", writer_agent)
workflow.add_node("editor", editor_agent)
workflow.add_node("seo", seo_agent)
workflow.add_node("quality_check", quality_check_agent)

# 定义流程
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "editor")
workflow.add_edge("editor", "seo")
workflow.add_edge("seo", "quality_check")

# 条件路由:如果需要修订,返回编辑阶段
def should_revise(state: ContentState) -> Literal["revise", "finish"]:
    """决策:是否需要修订"""
    return "revise" if state.get("needs_revision", False) else "finish"

workflow.add_conditional_edges(
    "quality_check",
    should_revise,
    {
        "revise": "editor",  # 返回编辑阶段
        "finish": END
    }
)

# 编译工作流
app = workflow.compile()

# ========== 5. 使用流水线 ==========

from langchain_core.messages import HumanMessage

result = app.invoke({
    "messages": [HumanMessage(content="创建一篇文章")],
    "topic": "AI 在内容创作中的应用",
    "target_audience": "营销人员"
})

print("最终内容:")
print(result["seo_optimized_content"])
print("\nSEO 评分:", result["seo_score"]["score"])
print("语法错误:", result["grammar_check"]["errors"])

# 执行流程:
# 研究 → 写作 → 编辑 → SEO → 质量检查
#                   ↑              ↓
#                   └── 需要修订 ──┘

# 优势:
# - 每个代理专注于自己的领域
# - 可以使用最适合的模型(写作用 Claude,SEO 用 GPT)
# - 支持质量反馈循环
# - 易于扩展(添加新的优化步骤)
# - 每个阶段可独立测试

✨ Multi-Agent Subagents 最佳实践

1. 代理职责设计

✅ 好的设计
  • 单一职责:每个代理专注一个明确的任务
  • 清晰边界:代理之间的职责不重叠
  • 可测试:每个代理可以独立测试
  • 可重用:代理可以在不同工作流中重用

2. 状态设计原则

  • 最小化共享:只共享必要的状态字段
  • 明确数据流:清楚哪个代理写入哪个字段
  • 类型安全:使用 TypedDict 定义状态结构
  • 可选字段:使用 | None 标记可选字段

3. 工作流设计

模式 何时使用 示例场景
顺序工作流 任务有明确的先后顺序 研究 → 写作 → 编辑
并行工作流 任务可以同时进行 多源数据收集
条件工作流 根据结果动态路由 客服分类路由
循环工作流 需要迭代优化 质量检查 → 修订

4. 错误处理

Python
# 为每个代理添加错误处理
def robust_agent(state: AgentState):
    """带错误处理的代理"""
    try:
        # 主要逻辑
        result = process_task(state)
        return result
    except SpecificError as e:
        # 特定错误的处理
        logger.error(f"代理失败: {e}")
        return {
            "error": str(e),
            "fallback_result": "降级处理结果"
        }
    except Exception as e:
        # 通用错误处理
        logger.error(f"未预期错误: {e}")
        return {
            "error": str(e),
            "status": "failed"
        }

5. 性能优化

  • 并行化:识别可并行的任务
  • 缓存:缓存代理的输出避免重复计算
  • 流式输出:对于长时间任务使用流式响应
  • 超时控制:为每个代理设置合理的超时
  • 资源限制:限制并发代理数量

❓ 常见问题

Q1: Multi-Agent Subagents 和 DeepAgents Subagents 可以混用吗?

可以!实际上这是推荐的做法:

  • 使用 Multi-Agent 构建顶层工作流(代理之间的协作)
  • 每个 Multi-Agent 节点可以是一个 DeepAgent(增强单个代理能力)
  • 这样既获得了专业化分工的优势,又获得了强大的单代理能力

Q2: 如何处理代理之间的依赖关系?

使用明确的边和状态字段

  • 在状态中定义依赖字段(如 stage1_output
  • 使用 add_edge 定义执行顺序
  • 后续代理只读取前置代理的输出字段
  • 避免隐式依赖,保持数据流清晰

Q3: 并行代理如何同步?

LangGraph 自动同步

  • 所有并行代理执行完毕后,才会进入下一个节点
  • 可以添加一个 "汇总节点" 整合并行结果
  • 状态更新会自动合并

Q4: 如何调试复杂的 Multi-Agent 工作流?

调试策略

  • LangSmith 追踪:可视化整个执行流程
  • 逐节点测试:单独测试每个代理
  • 打印状态:在关键节点打印状态
  • 可视化工作流:使用 Mermaid 图表

Q5: 循环工作流如何避免无限循环?

设置循环限制

Python
# 在状态中添加循环计数器
class LoopState(TypedDict):
    iteration_count: int
    max_iterations: int
    # ... 其他字段

def should_continue(state: LoopState) -> Literal["continue", "stop"]:
    """决策:是否继续循环"""
    if state["iteration_count"] >= state["max_iterations"]:
        return "stop"  # 达到最大次数,停止
    if state.get("quality_ok", False):
        return "stop"  # 质量合格,停止
    return "continue"  # 继续循环

# 在循环节点中递增计数器
def revision_agent(state: LoopState):
    return {
        "iteration_count": state["iteration_count"] + 1,
        # ... 其他处理
    }

Q6: 如何选择每个代理使用的模型?

根据任务特点选择

任务类型 推荐模型 原因
数据收集 GPT-4o 快速、成本低
复杂分析 Claude Sonnet 4.5 推理能力强
创意写作 Claude Sonnet 4.5 语言质量高
代码生成 GPT-4o 代码能力优秀
简单分类 GPT-4o-mini 成本最低

📖 参考资源