📚 什么是自定义工作流?

自定义工作流(Custom Workflow)是 Multi-Agent 系统中的高级编排模式。 开发者使用 LangGraph 完全控制图结构,将确定性逻辑智能决策结合, 构建适应复杂业务场景的定制化流程。

💡 核心特性
特性 说明 优势
完全控制 完全掌控图结构和执行流程 灵活适应任何业务需求
混合逻辑 确定性步骤 + AI 决策 平衡可靠性和智能性
模式组合 集成 Subagents/Handoffs/Skills/Router 复用现有模式能力
多种执行模式 顺序、并行、条件、循环 支持复杂业务场景
状态管理 类型化状态字典 清晰的数据流转
graph TB User["用户请求"] --> CustomWF["自定义工作流"] subgraph CustomWF["自定义工作流架构"] Router["Router 节点
智能路由"] Validate["验证节点
确定性逻辑"] Agent1["Agent 节点
智能处理"] Parallel["并行节点"] Condition["条件节点
分支决策"] Subagent["Subagent 节点
复杂子流程"] Aggregate["汇总节点
结果整合"] end Router --> Validate Validate --> Condition Condition -->|"简单任务"| Agent1 Condition -->|"复杂任务"| Parallel Parallel --> P1["子任务 1"] Parallel --> P2["子任务 2"] Parallel --> P3["子任务 3"] P1 --> Aggregate P2 --> Aggregate P3 --> Aggregate Agent1 --> Subagent Aggregate --> Subagent Subagent --> Result["最终结果"] style CustomWF fill:#3b82f6,color:#fff style Router fill:#10b981,color:#fff style Validate fill:#f59e0b,color:#fff style Agent1 fill:#8b5cf6,color:#fff style Parallel fill:#ef4444,color:#fff style Condition fill:#06b6d4,color:#fff style Subagent fill:#a855f7,color:#fff style Aggregate fill:#ec4899,color:#fff

何时使用自定义工作流?

✅ 适合使用场景
  • 标准模式无法满足:业务逻辑超出 Subagents/Handoffs/Skills/Router 的组合
  • 混合确定性和智能:需要在固定流程中嵌入 AI 决策
  • 复杂路由逻辑:多条件分支、动态路径选择
  • 多阶段处理:数据预处理 → AI 分析 → 后处理 → 验证
  • 并行任务协调:同时执行多个独立任务后汇总
  • 迭代优化流程:循环改进直到满足质量标准

自定义工作流 vs 标准模式

对比维度 标准模式 自定义工作流
复杂度 简单、模式化 复杂、高度定制
灵活性 受模式限制 完全自由
开发成本 低(使用预定义模式) 高(需要设计和实现)
维护成本 低(标准化) 中(需要文档和注释)
适用场景 常见业务场景 特殊业务需求

🔧 工作流执行模式

自定义工作流支持四种基本执行模式,可以灵活组合构建复杂流程。

graph TB subgraph Sequential["模式 1:顺序执行"] S1["步骤 1"] --> S2["步骤 2"] S2 --> S3["步骤 3"] S3 --> S4["步骤 4"] end subgraph Parallel["模式 2:并行执行"] P_Start["开始"] --> P1["任务 1"] P_Start --> P2["任务 2"] P_Start --> P3["任务 3"] P1 --> P_End["汇总"] P2 --> P_End P3 --> P_End end subgraph Conditional["模式 3:条件分支"] C_Start["开始"] --> C_Decision{"条件判断"} C_Decision -->|"路径 A"| C_A["处理 A"] C_Decision -->|"路径 B"| C_B["处理 B"] C_Decision -->|"路径 C"| C_C["处理 C"] end subgraph Cyclic["模式 4:循环迭代"] L_Start["开始"] --> L_Process["处理"] L_Process --> L_Check{"满足条件?"} L_Check -->|"否"| L_Process L_Check -->|"是"| L_End["结束"] end style Sequential fill:#10b981,color:#fff style Parallel fill:#f59e0b,color:#fff style Conditional fill:#8b5cf6,color:#fff style Cyclic fill:#ef4444,color:#fff

模式 1:顺序执行

Python 🟢 基础
"""
模式 1:顺序执行 - RAG 流水线示例
"""
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

# ========== 1. 定义状态 ==========

class RAGState(TypedDict):
    question: str
    rewritten_query: str
    documents: list[str]
    answer: str

# ========== 2. 节点定义 ==========

# 节点 1:查询改写(LLM)
class RewrittenQuery(BaseModel):
    query: str = Field(description="改写后的查询,优化用于检索")

def rewrite_node(state: RAGState):
    """使用 LLM 改写查询"""
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_structure = llm.with_structured_output(RewrittenQuery)

    prompt = f"""将用户问题改写为更适合检索的查询:

用户问题:{state['question']}

要求:
- 提取核心关键词
- 转换为陈述句
- 去除口语化表达
"""

    rewritten = llm_with_structure.invoke(prompt)

    return {
        "rewritten_query": rewritten.query
    }

# 节点 2:文档检索(确定性)
def retrieve_node(state: RAGState):
    """从向量数据库检索文档"""
    # 实际会调用向量数据库
    # 这里模拟检索结果
    query = state["rewritten_query"]

    # 模拟向量检索
    retrieved_docs = [
        f"文档1:关于 {query} 的内容...",
        f"文档2:{query} 的详细说明...",
        f"文档3:{query} 的案例研究..."
    ]

    return {
        "documents": retrieved_docs
    }

# 节点 3:答案生成(LLM Agent)
def answer_node(state: RAGState):
    """基于检索文档生成答案"""
    llm = ChatOpenAI(model="claude-sonnet-4-5")

    context = "\n\n".join(state["documents"])
    prompt = f"""基于以下文档回答用户问题:

文档:
{context}

用户问题:{state['question']}

要求:
- 仅基于文档内容回答
- 引用具体文档
- 简洁清晰
"""

    response = llm.invoke(prompt)

    return {
        "answer": response.content
    }

# ========== 3. 构建顺序工作流 ==========

workflow = StateGraph(RAGState)

# 添加节点
workflow.add_node("rewrite", rewrite_node)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("answer", answer_node)

# 定义顺序执行
workflow.add_edge(START, "rewrite")
workflow.add_edge("rewrite", "retrieve")
workflow.add_edge("retrieve", "answer")
workflow.add_edge("answer", END)

# 编译
app = workflow.compile()

# 使用
result = app.invoke({
    "question": "LangChain 是什么?"
})

print(f"原始问题:{result['question']}")
print(f"改写查询:{result['rewritten_query']}")
print(f"检索文档数:{len(result['documents'])}")
print(f"答案:{result['answer']}")

# 执行流程:
# 用户问题 → 查询改写(LLM) → 文档检索(确定性) → 答案生成(LLM) → 返回

# 优势:
# - 流程清晰,易于理解
# - 混合 LLM 和确定性步骤
# - 每个节点职责单一

模式 2:并行执行

Python 🟡 中级
"""
模式 2:并行执行 - 多源数据聚合
"""
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

# 定义状态
class MultiSourceState(TypedDict):
    query: str
    web_results: str
    database_results: str
    api_results: str
    aggregated_answer: str

# 并行节点 1:网络搜索
def web_search_node(state: MultiSourceState):
    """搜索网络信息"""
    # 实际会调用搜索 API
    return {
        "web_results": f"网络搜索结果:关于 {state['query']} 的最新信息..."
    }

# 并行节点 2:数据库查询
def database_query_node(state: MultiSourceState):
    """查询数据库"""
    # 实际会查询数据库
    return {
        "database_results": f"数据库查询:{state['query']} 的历史数据..."
    }

# 并行节点 3:API 调用
def api_call_node(state: MultiSourceState):
    """调用外部 API"""
    # 实际会调用 API
    return {
        "api_results": f"API 数据:{state['query']} 的实时数据..."
    }

# 汇总节点
def aggregate_node(state: MultiSourceState):
    """汇总所有来源的数据"""
    llm = ChatOpenAI(model="gpt-4o")

    prompt = f"""整合以下多源数据,生成综合答案:

用户查询:{state['query']}

网络数据:{state['web_results']}
数据库数据:{state['database_results']}
API 数据:{state['api_results']}

要求:综合分析,给出全面的答案。
"""

    response = llm.invoke(prompt)

    return {
        "aggregated_answer": response.content
    }

# 构建并行工作流
workflow = StateGraph(MultiSourceState)

# 添加节点
workflow.add_node("web_search", web_search_node)
workflow.add_node("database_query", database_query_node)
workflow.add_node("api_call", api_call_node)
workflow.add_node("aggregate", aggregate_node)

# 并行执行:START 到三个数据源
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "database_query")
workflow.add_edge(START, "api_call")

# 所有数据源完成后汇总
workflow.add_edge("web_search", "aggregate")
workflow.add_edge("database_query", "aggregate")
workflow.add_edge("api_call", "aggregate")

workflow.add_edge("aggregate", END)

app = workflow.compile()

# 使用
result = app.invoke({
    "query": "今年 AI 领域的最新进展"
})

print(f"综合答案:{result['aggregated_answer']}")

# 执行流程:
#            ┌→ 网络搜索 ─┐
# 用户查询 ─┼→ 数据库查询 ─┼→ 汇总 → 返回
#            └→ API 调用 ─┘

# 优势:
# - 提高效率(并行执行)
# - 多源数据整合
# - 降低总延迟

模式 3:条件分支

Python 🟡 中级
"""
模式 3:条件分支 - 基于复杂度的动态路由
"""
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI

# 定义状态
class ConditionalState(TypedDict):
    query: str
    complexity: str  # "simple", "medium", "complex"
    answer: str

# 复杂度评估
class ComplexityAssessment(BaseModel):
    complexity: Literal["simple", "medium", "complex"] = Field(
        description="查询复杂度:simple(简单), medium(中等), complex(复杂)"
    )
    reason: str = Field(description="判断理由")

def assess_complexity(state: ConditionalState):
    """评估查询复杂度"""
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_structure = llm.with_structured_output(ComplexityAssessment)

    prompt = f"""评估用户查询的复杂度:

查询:{state['query']}

分类标准:
- simple: 事实查询、简单定义
- medium: 需要分析、对比
- complex: 需要深度研究、多步推理

返回复杂度和理由。
"""

    assessment = llm_with_structure.invoke(prompt)

    return {
        "complexity": assessment.complexity
    }

# 简单查询处理
def simple_handler(state: ConditionalState):
    """处理简单查询"""
    llm = ChatOpenAI(model="gpt-4o-mini")  # 使用便宜模型
    response = llm.invoke(f"简洁回答:{state['query']}")
    return {"answer": response.content}

# 中等查询处理
def medium_handler(state: ConditionalState):
    """处理中等查询"""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(f"详细分析:{state['query']}")
    return {"answer": response.content}

# 复杂查询处理
def complex_handler(state: ConditionalState):
    """处理复杂查询"""
    llm = ChatOpenAI(model="claude-sonnet-4-5")  # 使用高级模型
    response = llm.invoke(f"深度研究和分析:{state['query']}")
    return {"answer": response.content}

# 路由函数
def route_by_complexity(state: ConditionalState) -> str:
    """根据复杂度路由"""
    complexity_map = {
        "simple": "simple_handler",
        "medium": "medium_handler",
        "complex": "complex_handler"
    }
    return complexity_map[state["complexity"]]

# 构建条件工作流
workflow = StateGraph(ConditionalState)

# 添加节点
workflow.add_node("assess", assess_complexity)
workflow.add_node("simple_handler", simple_handler)
workflow.add_node("medium_handler", medium_handler)
workflow.add_node("complex_handler", complex_handler)

# 流程
workflow.add_edge(START, "assess")

# 条件路由
workflow.add_conditional_edges(
    "assess",
    route_by_complexity,
    {
        "simple_handler": "simple_handler",
        "medium_handler": "medium_handler",
        "complex_handler": "complex_handler"
    }
)

# 所有路径结束
workflow.add_edge("simple_handler", END)
workflow.add_edge("medium_handler", END)
workflow.add_edge("complex_handler", END)

app = workflow.compile()

# 测试
test_queries = [
    "什么是 Python?",
    "对比 Python 和 Java 的优缺点",
    "设计一个高并发的微服务架构"
]

for query in test_queries:
    result = app.invoke({"query": query})
    print(f"查询:{query}")
    print(f"复杂度:{result['complexity']}")
    print(f"答案:{result['answer'][:100]}...\n")

# 执行流程:
#               ┌→ simple_handler (gpt-4o-mini)
# 查询 → 评估 ─┼→ medium_handler (gpt-4o)
#               └→ complex_handler (claude-sonnet-4-5)

# 优势:
# - 成本优化(简单查询用便宜模型)
# - 质量保证(复杂查询用高级模型)
# - 自动决策

模式 4:循环迭代

Python 🔴 高级
"""
模式 4:循环迭代 - 质量驱动的内容优化
"""
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI

# 定义状态
class IterativeState(TypedDict):
    topic: str
    draft: str
    quality_score: float
    iteration_count: int
    max_iterations: int
    feedback: str
    final_content: str

# 质量评估结构
class QualityAssessment(BaseModel):
    score: float = Field(description="质量评分 (0-1)", ge=0, le=1)
    feedback: str = Field(description="改进建议")
    pass_quality: bool = Field(description="是否达到质量标准")

# 内容生成节点
def generate_content(state: IterativeState):
    """生成或改进内容"""
    llm = ChatOpenAI(model="claude-sonnet-4-5")

    if state.get("draft"):
        # 改进现有内容
        prompt = f"""改进以下内容:

主题:{state['topic']}
当前版本:{state['draft']}
改进建议:{state['feedback']}

要求:根据反馈改进内容。
"""
    else:
        # 首次生成
        prompt = f"""创作关于以下主题的内容:

主题:{state['topic']}

要求:高质量、结构清晰、内容丰富。
"""

    response = llm.invoke(prompt)

    return {
        "draft": response.content,
        "iteration_count": state.get("iteration_count", 0) + 1
    }

# 质量检查节点
def quality_check(state: IterativeState):
    """检查内容质量"""
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_structure = llm.with_structured_output(QualityAssessment)

    prompt = f"""评估内容质量:

主题:{state['topic']}
内容:{state['draft']}

评估标准:
- 内容相关性和准确性
- 结构清晰度
- 语言流畅性
- 信息完整性

质量阈值:0.8(低于需改进)
返回评分、是否达标、改进建议。
"""

    assessment = llm_with_structure.invoke(prompt)

    return {
        "quality_score": assessment.score,
        "feedback": assessment.feedback
    }

# 决策函数
def should_continue(state: IterativeState) -> Literal["improve", "finish"]:
    """决定是否继续优化"""
    # 达到最大迭代次数
    if state["iteration_count"] >= state["max_iterations"]:
        return "finish"

    # 质量达标
    if state["quality_score"] >= 0.8:
        return "finish"

    # 继续优化
    return "improve"

# 完成节点
def finalize(state: IterativeState):
    """最终化内容"""
    return {
        "final_content": state["draft"]
    }

# 构建循环工作流
workflow = StateGraph(IterativeState)

# 添加节点
workflow.add_node("generate", generate_content)
workflow.add_node("quality_check", quality_check)
workflow.add_node("finalize", finalize)

# 流程
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "quality_check")

# 条件循环
workflow.add_conditional_edges(
    "quality_check",
    should_continue,
    {
        "improve": "generate",  # 循环回去改进
        "finish": "finalize"
    }
)

workflow.add_edge("finalize", END)

app = workflow.compile()

# 使用
result = app.invoke({
    "topic": "AI 在医疗领域的应用",
    "max_iterations": 3
})

print(f"主题:{result['topic']}")
print(f"迭代次数:{result['iteration_count']}")
print(f"最终质量评分:{result['quality_score']}")
print(f"最终内容:{result['final_content'][:200]}...")

# 执行流程:
# 生成 → 质量检查 → (不达标?) → 生成 → 质量检查 → (达标) → 完成

# 优势:
# - 自动质量控制
# - 迭代优化
# - 防止无限循环(最大次数限制)

🎯 完整实战示例

构建一个智能内容审核和发布系统,综合使用所有工作流模式和 Multi-Agent 协作模式, 实现从内容接收到发布的完整自动化流程。

Python 🔴 高级 - 生产级综合示例
"""
完整实战示例 - 智能内容审核和发布系统
综合使用:Router + Subagents + Handoffs + Skills + 四种工作流模式
"""
import uuid
from typing import TypedDict, Annotated, Literal
from enum import Enum
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain.tools import tool

# ========== 1. 定义枚举和数据结构 ==========

class ContentType(str, Enum):
    ARTICLE = "article"
    VIDEO = "video"
    IMAGE = "image"

class ReviewStatus(str, Enum):
    PENDING = "pending"
    APPROVED = "approved"
    REJECTED = "rejected"
    NEEDS_REVISION = "needs_revision"

class ContentQuality(BaseModel):
    score: float = Field(ge=0, le=1)
    issues: list[str]
    suggestions: list[str]
    pass_review: bool

# ========== 2. 定义状态 ==========

class ContentWorkflowState(TypedDict):
    # 输入
    content_id: str
    content_type: ContentType
    raw_content: str
    submitter: str

    # Router 阶段
    route_to: str

    # 预处理阶段
    preprocessed_content: str

    # 审核阶段
    quality_assessment: ContentQuality
    review_status: ReviewStatus
    revision_count: int

    # 并行分析
    seo_score: float
    sentiment: str
    compliance_passed: bool

    # 最终输出
    final_content: str
    published: bool
    publish_url: str

# ========== 3. Skills 定义 ==========

@tool
def check_content_compliance(content: str) -> dict:
    """检查内容合规性"""
    # 实际会检查敏感词、版权等
    return {
        "passed": True,
        "issues": []
    }

@tool
def analyze_sentiment(content: str) -> str:
    """分析内容情绪"""
    # 实际会调用情感分析 API
    return "positive"

@tool
def calculate_seo_score(content: str) -> float:
    """计算 SEO 评分"""
    # 实际会分析关键词密度、可读性等
    return 0.85

@tool
def publish_to_cms(content: str, metadata: dict) -> dict:
    """发布到 CMS"""
    # 实际会调用 CMS API
    return {
        "success": True,
        "url": f"https://example.com/content/{uuid.uuid4().hex}"
    }

# ========== 4. 节点定义 ==========

# Router 节点:内容分类
def content_router(state: ContentWorkflowState):
    """根据内容类型路由到不同处理流程"""
    content_type = state["content_type"]

    route_map = {
        ContentType.ARTICLE: "article_pipeline",
        ContentType.VIDEO: "video_pipeline",
        ContentType.IMAGE: "image_pipeline"
    }

    return {
        "route_to": route_map[content_type]
    }

# 预处理节点
def preprocess_content(state: ContentWorkflowState):
    """预处理内容(确定性逻辑)"""
    raw = state["raw_content"]

    # 清理、格式化
    preprocessed = raw.strip()
    # 实际会做更多处理:HTML 清理、格式标准化等

    return {
        "preprocessed_content": preprocessed
    }

# 质量评估节点(Subagent)
def quality_assessment_agent(state: ContentWorkflowState):
    """使用 Agent 评估内容质量"""
    llm = ChatAnthropic(model="claude-sonnet-4-5")
    llm_with_structure = llm.with_structured_output(ContentQuality)

    prompt = f"""全面评估内容质量:

内容类型:{state['content_type'].value}
内容:{state['preprocessed_content'][:500]}...

评估维度:
- 内容相关性和准确性
- 语言质量和流畅性
- 结构清晰度
- 信息价值
- 是否存在明显问题

质量阈值:0.7(低于需修改)
返回评分、问题列表、改进建议、是否通过。
"""

    assessment = llm_with_structure.invoke(prompt)

    # 确定审核状态
    if assessment.pass_review:
        status = ReviewStatus.APPROVED
    else:
        status = ReviewStatus.NEEDS_REVISION

    return {
        "quality_assessment": assessment,
        "review_status": status
    }

# 内容改进节点(循环优化)
def content_improvement_agent(state: ContentWorkflowState):
    """根据反馈改进内容"""
    llm = ChatAnthropic(model="claude-sonnet-4-5")

    quality = state["quality_assessment"]
    issues = ", ".join(quality.issues)
    suggestions = ", ".join(quality.suggestions)

    prompt = f"""改进内容:

原内容:{state['preprocessed_content']}

问题:{issues}
建议:{suggestions}

要求:根据反馈改进内容,保持原意。
"""

    response = llm.invoke(prompt)

    return {
        "preprocessed_content": response.content,
        "revision_count": state.get("revision_count", 0) + 1
    }

# 并行分析节点 1:SEO 分析
def seo_analysis(state: ContentWorkflowState):
    """SEO 分析"""
    score = calculate_seo_score.invoke({"content": state["preprocessed_content"]})
    return {"seo_score": score}

# 并行分析节点 2:情感分析
def sentiment_analysis(state: ContentWorkflowState):
    """情感分析"""
    sentiment = analyze_sentiment.invoke({"content": state["preprocessed_content"]})
    return {"sentiment": sentiment}

# 并行分析节点 3:合规检查
def compliance_check(state: ContentWorkflowState):
    """合规性检查"""
    result = check_content_compliance.invoke({"content": state["preprocessed_content"]})
    return {"compliance_passed": result["passed"]}

# Handoff 决策:是否需要人工审核
def need_human_review(state: ContentWorkflowState) -> Literal["human_review", "auto_publish"]:
    """决定是否需要人工审核"""
    quality = state["quality_assessment"]

    # 低质量或不合规需要人工审核
    if quality.score < 0.6 or not state["compliance_passed"]:
        return "human_review"

    return "auto_publish"

# 人工审核节点(模拟)
def human_review_agent(state: ContentWorkflowState):
    """人工审核(实际会等待人工决策)"""
    # 模拟人工批准
    return {
        "review_status": ReviewStatus.APPROVED,
        "final_content": state["preprocessed_content"]
    }

# 发布节点
def publish_content(state: ContentWorkflowState):
    """发布内容"""
    result = publish_to_cms.invoke({
        "content": state.get("final_content", state["preprocessed_content"]),
        "metadata": {
            "content_type": state["content_type"].value,
            "submitter": state["submitter"],
            "seo_score": state["seo_score"]
        }
    })

    return {
        "published": result["success"],
        "publish_url": result["url"]
    }

# ========== 5. 条件路由函数 ==========

def route_by_type(state: ContentWorkflowState) -> str:
    """根据内容类型路由"""
    return state["route_to"]

def should_revise(state: ContentWorkflowState) -> Literal["revise", "parallel_analysis"]:
    """决定是否需要修改"""
    if state["review_status"] == ReviewStatus.NEEDS_REVISION:
        # 检查是否超过最大修改次数
        if state.get("revision_count", 0) >= 2:
            return "parallel_analysis"  # 强制进入下一阶段
        return "revise"
    return "parallel_analysis"

# ========== 6. 构建综合工作流 ==========

workflow = StateGraph(ContentWorkflowState)

# ===== 第一阶段:Router =====
workflow.add_node("router", content_router)

# ===== 第二阶段:预处理(确定性) =====
workflow.add_node("preprocess", preprocess_content)

# ===== 第三阶段:质量评估(Agent) =====
workflow.add_node("quality_assessment", quality_assessment_agent)

# ===== 第四阶段:内容改进(循环) =====
workflow.add_node("improve", content_improvement_agent)

# ===== 第五阶段:并行分析 =====
workflow.add_node("seo_analysis", seo_analysis)
workflow.add_node("sentiment_analysis", sentiment_analysis)
workflow.add_node("compliance_check", compliance_check)

# ===== 第六阶段:Handoff(人工审核) =====
workflow.add_node("human_review", human_review_agent)

# ===== 第七阶段:发布 =====
workflow.add_node("publish", publish_content)

# ===== 连接流程 =====

# START → router
workflow.add_edge(START, "router")

# router → preprocess(所有类型都经过预处理)
workflow.add_conditional_edges(
    "router",
    route_by_type,
    {
        "article_pipeline": "preprocess",
        "video_pipeline": "preprocess",
        "image_pipeline": "preprocess"
    }
)

# preprocess → quality_assessment
workflow.add_edge("preprocess", "quality_assessment")

# quality_assessment → 条件分支(修改或并行分析)
workflow.add_conditional_edges(
    "quality_assessment",
    should_revise,
    {
        "revise": "improve",
        "parallel_analysis": "seo_analysis"
    }
)

# improve → quality_assessment(循环)
workflow.add_edge("improve", "quality_assessment")

# 并行分析(三个节点同时执行)
workflow.add_edge("seo_analysis", "human_review")
workflow.add_edge("sentiment_analysis", "human_review")
workflow.add_edge("compliance_check", "human_review")

# human_review → 条件分支(人工审核或自动发布)
workflow.add_conditional_edges(
    "human_review",
    need_human_review,
    {
        "human_review": "publish",  # 已经人工审核,直接发布
        "auto_publish": "publish"
    }
)

# publish → END
workflow.add_edge("publish", END)

# 编译
app = workflow.compile()

# ========== 7. 测试系统 ==========

test_content = {
    "content_id": str(uuid.uuid4()),
    "content_type": ContentType.ARTICLE,
    "raw_content": """
    人工智能正在改变世界。随着技术的发展,AI 应用越来越广泛。
    从医疗到金融,从教育到娱乐,AI 无处不在。
    """,
    "submitter": "user_001"
}

result = app.invoke(test_content)

print("=" * 60)
print("内容审核和发布系统执行结果")
print("=" * 60)
print(f"内容 ID:{result['content_id']}")
print(f"内容类型:{result['content_type'].value}")
print(f"提交者:{result['submitter']}")
print(f"\n审核结果:")
print(f"  - 质量评分:{result['quality_assessment'].score}")
print(f"  - 审核状态:{result['review_status'].value}")
print(f"  - 修改次数:{result.get('revision_count', 0)}")
print(f"\n并行分析:")
print(f"  - SEO 评分:{result['seo_score']}")
print(f"  - 情感倾向:{result['sentiment']}")
print(f"  - 合规检查:{'通过' if result['compliance_passed'] else '未通过'}")
print(f"\n发布状态:")
print(f"  - 是否发布:{result['published']}")
print(f"  - 发布 URL:{result['publish_url']}")

# 工作流执行过程:
# 1. Router:根据内容类型路由
# 2. 预处理:清理和格式化(确定性)
# 3. 质量评估:Agent 智能评估
# 4. 条件循环:质量不达标时改进(最多 2 次)
# 5. 并行分析:SEO + 情感 + 合规(同时进行)
# 6. Handoff 决策:低质量转人工审核
# 7. 发布:发布到 CMS

# 优势:
# - 综合使用所有 Multi-Agent 模式
# - 混合确定性逻辑和 AI 决策
# - 自动化质量控制和优化
# - 灵活的人工介入机制
# - 生产级错误处理和追踪

✨ 自定义工作流最佳实践

1. 工作流设计原则

✅ 设计原则
  • 关注点分离:每个节点承担明确的单一职责
  • 确定性优先:能用确定性逻辑就不用 LLM(提高可靠性和速度)
  • 类型化状态:使用 TypedDict 定义清晰的状态结构
  • 模块化设计:节点可独立测试和复用
  • 渐进式复杂度:从简单顺序流程开始,逐步添加复杂性
  • 错误边界:为关键节点添加错误处理和降级

2. 节点类型选择

任务类型 推荐节点类型 示例
数据获取 确定性函数 数据库查询、API 调用
数据转换 确定性函数 格式转换、清洗
内容理解 LLM 节点 分类、摘要、情感分析
内容生成 LLM 节点 写作、改写、创作
复杂推理 Agent 节点 多步决策、工具调用
验证检查 确定性函数 格式校验、规则检查

3. 性能优化策略

  • 并行化:识别可并行的独立任务
  • 缓存:缓存昂贵的 LLM 调用结果
  • 早停:条件提前终止,跳过不必要的步骤
  • 批处理:将多个请求批量处理
  • 模型选择:根据任务复杂度选择模型(简单任务用 mini)
  • 流式输出:对长时间任务使用流式响应

4. 状态管理最佳实践

Python
# 好的状态设计

class WellDesignedState(TypedDict):
    # 输入字段(清晰标注)
    user_query: str
    user_id: str

    # 中间结果(按阶段组织)
    stage1_result: str
    stage2_analysis: dict

    # 元数据(追踪信息)
    processing_time_ms: float
    node_history: list[str]

    # 输出字段
    final_answer: str
    confidence: float

# 差的状态设计

class PoorlyDesignedState(TypedDict):
    # 字段命名不清晰
    data: str
    result: str
    temp: str

    # 缺少类型提示
    analysis  # 类型不明确

    # 职责混乱
    everything: dict  # 所有数据混在一起

5. 错误处理和监控

Python
# 节点错误处理

def robust_node(state):
    """带错误处理的节点"""
    try:
        # 主要逻辑
        result = expensive_operation(state)
        return {"result": result}
    except SpecificError as e:
        # 降级处理
        logger.warning(f"Operation failed, using fallback: {e}")
        return {"result": fallback_operation(state)}
    except Exception as e:
        # 记录错误并传递
        logger.error(f"Node failed: {e}")
        return {"error": str(e), "failed_node": "robust_node"}

# 工作流级别监控

def add_monitoring(workflow):
    """为工作流添加监控"""
    def log_node_entry(node_name):
        print(f"Entering node: {node_name}")

    def log_node_exit(node_name, duration_ms):
        print(f"Exiting node: {node_name} ({duration_ms}ms)")

    # 使用 LangSmith 追踪
    import os
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_PROJECT"] = "custom-workflow"

❓ 常见问题

Q1: 何时使用自定义工作流而非标准模式?

决策树

  1. 需求是否符合 Subagents(顺序流水线)?→ 是:使用 Subagents
  2. 是否需要运行时动态转移?→ 是:使用 Handoffs
  3. 是否只需初始分类路由?→ 是:使用 Router
  4. 是否只需共享工具能力?→ 是:使用 Skills
  5. 以上都不完全符合?→ 使用自定义工作流

Q2: 如何组合多个 Multi-Agent 模式?

组合策略

  • Router 作为入口:自定义工作流的第一个节点可以是 Router
  • Subagents 作为节点:将完整的 Subagent 流程作为单个节点
  • Skills 作为工具:所有代理节点使用共享的 Skills
  • Handoffs 作为决策:使用条件边实现 Handoff 逻辑

Q3: 如何避免工作流过于复杂?

复杂度管理

  • 分层设计:将复杂工作流拆分为多个子工作流
  • 限制深度:避免超过 3 层嵌套
  • 可视化:使用 Mermaid 图表记录工作流
  • 代码审查:定期审查工作流设计
  • 重构:当工作流超过 10 个节点时考虑重构

Q4: 如何测试自定义工作流?

测试策略

Python
# 单元测试:测试单个节点
def test_quality_assessment_node():
    state = {
        "preprocessed_content": "测试内容"
    }
    result = quality_assessment_agent(state)
    assert "quality_assessment" in result
    assert 0 <= result["quality_assessment"].score <= 1

# 集成测试:测试完整路径
def test_full_workflow():
    input_state = {
        "content_type": ContentType.ARTICLE,
        "raw_content": "测试文章内容"
    }
    result = app.invoke(input_state)
    assert result["published"] == True

# 路径测试:测试所有条件分支
def test_revision_path():
    # 测试触发修改的路径
    pass

def test_human_review_path():
    # 测试触发人工审核的路径
    pass

Q5: 如何调试工作流执行?

调试技巧

  • LangSmith 追踪:查看完整执行链和每个节点的输入输出
  • 打印状态:在关键节点打印状态变化
  • 日志记录:为每个节点添加结构化日志
  • 单步执行:独立测试每个节点
  • 可视化执行:生成工作流执行的时序图

Q6: 如何优化工作流性能?

性能优化清单

  • ✅ 识别可并行的节点并并行执行
  • ✅ 缓存昂贵的 LLM 调用
  • ✅ 对简单任务使用便宜模型(gpt-4o-mini)
  • ✅ 添加早停条件,跳过不必要的节点
  • ✅ 批量处理多个请求
  • ✅ 使用异步执行(async/await)
  • ✅ 监控每个节点的延迟,优化瓶颈

📖 参考资源