🔀 Multi-Agent 自定义工作流
设计和实现复杂的多代理业务流程,灵活组合 Subagents、Handoffs、Skills 和 Router, 构建适应各种业务场景的企业级工作流系统。
📚 什么是自定义工作流?
自定义工作流(Custom Workflow)是 Multi-Agent 系统中的高级编排模式。 开发者使用 LangGraph 完全控制图结构,将确定性逻辑与智能决策结合, 构建适应复杂业务场景的定制化流程。
| 特性 | 说明 | 优势 |
|---|---|---|
| 完全控制 | 完全掌控图结构和执行流程 | 灵活适应任何业务需求 |
| 混合逻辑 | 确定性步骤 + AI 决策 | 平衡可靠性和智能性 |
| 模式组合 | 集成 Subagents/Handoffs/Skills/Router | 复用现有模式能力 |
| 多种执行模式 | 顺序、并行、条件、循环 | 支持复杂业务场景 |
| 状态管理 | 类型化状态字典 | 清晰的数据流转 |
智能路由"] Validate["验证节点
确定性逻辑"] Agent1["Agent 节点
智能处理"] Parallel["并行节点"] Condition["条件节点
分支决策"] Subagent["Subagent 节点
复杂子流程"] Aggregate["汇总节点
结果整合"] end Router --> Validate Validate --> Condition Condition -->|"简单任务"| Agent1 Condition -->|"复杂任务"| Parallel Parallel --> P1["子任务 1"] Parallel --> P2["子任务 2"] Parallel --> P3["子任务 3"] P1 --> Aggregate P2 --> Aggregate P3 --> Aggregate Agent1 --> Subagent Aggregate --> Subagent Subagent --> Result["最终结果"] style CustomWF fill:#3b82f6,color:#fff style Router fill:#10b981,color:#fff style Validate fill:#f59e0b,color:#fff style Agent1 fill:#8b5cf6,color:#fff style Parallel fill:#ef4444,color:#fff style Condition fill:#06b6d4,color:#fff style Subagent fill:#a855f7,color:#fff style Aggregate fill:#ec4899,color:#fff
何时使用自定义工作流?
- 标准模式无法满足:业务逻辑超出 Subagents/Handoffs/Skills/Router 的组合
- 混合确定性和智能:需要在固定流程中嵌入 AI 决策
- 复杂路由逻辑:多条件分支、动态路径选择
- 多阶段处理:数据预处理 → AI 分析 → 后处理 → 验证
- 并行任务协调:同时执行多个独立任务后汇总
- 迭代优化流程:循环改进直到满足质量标准
自定义工作流 vs 标准模式
| 对比维度 | 标准模式 | 自定义工作流 |
|---|---|---|
| 复杂度 | 简单、模式化 | 复杂、高度定制 |
| 灵活性 | 受模式限制 | 完全自由 |
| 开发成本 | 低(使用预定义模式) | 高(需要设计和实现) |
| 维护成本 | 低(标准化) | 中(需要文档和注释) |
| 适用场景 | 常见业务场景 | 特殊业务需求 |
🔧 工作流执行模式
自定义工作流支持四种基本执行模式,可以灵活组合构建复杂流程。
模式 1:顺序执行
"""
模式 1:顺序执行 - RAG 流水线示例
"""
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
# ========== 1. 定义状态 ==========
class RAGState(TypedDict):
question: str
rewritten_query: str
documents: list[str]
answer: str
# ========== 2. 节点定义 ==========
# 节点 1:查询改写(LLM)
class RewrittenQuery(BaseModel):
query: str = Field(description="改写后的查询,优化用于检索")
def rewrite_node(state: RAGState):
"""使用 LLM 改写查询"""
llm = ChatOpenAI(model="gpt-4o")
llm_with_structure = llm.with_structured_output(RewrittenQuery)
prompt = f"""将用户问题改写为更适合检索的查询:
用户问题:{state['question']}
要求:
- 提取核心关键词
- 转换为陈述句
- 去除口语化表达
"""
rewritten = llm_with_structure.invoke(prompt)
return {
"rewritten_query": rewritten.query
}
# 节点 2:文档检索(确定性)
def retrieve_node(state: RAGState):
"""从向量数据库检索文档"""
# 实际会调用向量数据库
# 这里模拟检索结果
query = state["rewritten_query"]
# 模拟向量检索
retrieved_docs = [
f"文档1:关于 {query} 的内容...",
f"文档2:{query} 的详细说明...",
f"文档3:{query} 的案例研究..."
]
return {
"documents": retrieved_docs
}
# 节点 3:答案生成(LLM Agent)
def answer_node(state: RAGState):
"""基于检索文档生成答案"""
llm = ChatOpenAI(model="claude-sonnet-4-5")
context = "\n\n".join(state["documents"])
prompt = f"""基于以下文档回答用户问题:
文档:
{context}
用户问题:{state['question']}
要求:
- 仅基于文档内容回答
- 引用具体文档
- 简洁清晰
"""
response = llm.invoke(prompt)
return {
"answer": response.content
}
# ========== 3. 构建顺序工作流 ==========
workflow = StateGraph(RAGState)
# 添加节点
workflow.add_node("rewrite", rewrite_node)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("answer", answer_node)
# 定义顺序执行
workflow.add_edge(START, "rewrite")
workflow.add_edge("rewrite", "retrieve")
workflow.add_edge("retrieve", "answer")
workflow.add_edge("answer", END)
# 编译
app = workflow.compile()
# 使用
result = app.invoke({
"question": "LangChain 是什么?"
})
print(f"原始问题:{result['question']}")
print(f"改写查询:{result['rewritten_query']}")
print(f"检索文档数:{len(result['documents'])}")
print(f"答案:{result['answer']}")
# 执行流程:
# 用户问题 → 查询改写(LLM) → 文档检索(确定性) → 答案生成(LLM) → 返回
# 优势:
# - 流程清晰,易于理解
# - 混合 LLM 和确定性步骤
# - 每个节点职责单一
模式 2:并行执行
"""
模式 2:并行执行 - 多源数据聚合
"""
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# 定义状态
class MultiSourceState(TypedDict):
query: str
web_results: str
database_results: str
api_results: str
aggregated_answer: str
# 并行节点 1:网络搜索
def web_search_node(state: MultiSourceState):
"""搜索网络信息"""
# 实际会调用搜索 API
return {
"web_results": f"网络搜索结果:关于 {state['query']} 的最新信息..."
}
# 并行节点 2:数据库查询
def database_query_node(state: MultiSourceState):
"""查询数据库"""
# 实际会查询数据库
return {
"database_results": f"数据库查询:{state['query']} 的历史数据..."
}
# 并行节点 3:API 调用
def api_call_node(state: MultiSourceState):
"""调用外部 API"""
# 实际会调用 API
return {
"api_results": f"API 数据:{state['query']} 的实时数据..."
}
# 汇总节点
def aggregate_node(state: MultiSourceState):
"""汇总所有来源的数据"""
llm = ChatOpenAI(model="gpt-4o")
prompt = f"""整合以下多源数据,生成综合答案:
用户查询:{state['query']}
网络数据:{state['web_results']}
数据库数据:{state['database_results']}
API 数据:{state['api_results']}
要求:综合分析,给出全面的答案。
"""
response = llm.invoke(prompt)
return {
"aggregated_answer": response.content
}
# 构建并行工作流
workflow = StateGraph(MultiSourceState)
# 添加节点
workflow.add_node("web_search", web_search_node)
workflow.add_node("database_query", database_query_node)
workflow.add_node("api_call", api_call_node)
workflow.add_node("aggregate", aggregate_node)
# 并行执行:START 到三个数据源
workflow.add_edge(START, "web_search")
workflow.add_edge(START, "database_query")
workflow.add_edge(START, "api_call")
# 所有数据源完成后汇总
workflow.add_edge("web_search", "aggregate")
workflow.add_edge("database_query", "aggregate")
workflow.add_edge("api_call", "aggregate")
workflow.add_edge("aggregate", END)
app = workflow.compile()
# 使用
result = app.invoke({
"query": "今年 AI 领域的最新进展"
})
print(f"综合答案:{result['aggregated_answer']}")
# 执行流程:
# ┌→ 网络搜索 ─┐
# 用户查询 ─┼→ 数据库查询 ─┼→ 汇总 → 返回
# └→ API 调用 ─┘
# 优势:
# - 提高效率(并行执行)
# - 多源数据整合
# - 降低总延迟
模式 3:条件分支
"""
模式 3:条件分支 - 基于复杂度的动态路由
"""
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
# 定义状态
class ConditionalState(TypedDict):
query: str
complexity: str # "simple", "medium", "complex"
answer: str
# 复杂度评估
class ComplexityAssessment(BaseModel):
complexity: Literal["simple", "medium", "complex"] = Field(
description="查询复杂度:simple(简单), medium(中等), complex(复杂)"
)
reason: str = Field(description="判断理由")
def assess_complexity(state: ConditionalState):
"""评估查询复杂度"""
llm = ChatOpenAI(model="gpt-4o")
llm_with_structure = llm.with_structured_output(ComplexityAssessment)
prompt = f"""评估用户查询的复杂度:
查询:{state['query']}
分类标准:
- simple: 事实查询、简单定义
- medium: 需要分析、对比
- complex: 需要深度研究、多步推理
返回复杂度和理由。
"""
assessment = llm_with_structure.invoke(prompt)
return {
"complexity": assessment.complexity
}
# 简单查询处理
def simple_handler(state: ConditionalState):
"""处理简单查询"""
llm = ChatOpenAI(model="gpt-4o-mini") # 使用便宜模型
response = llm.invoke(f"简洁回答:{state['query']}")
return {"answer": response.content}
# 中等查询处理
def medium_handler(state: ConditionalState):
"""处理中等查询"""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(f"详细分析:{state['query']}")
return {"answer": response.content}
# 复杂查询处理
def complex_handler(state: ConditionalState):
"""处理复杂查询"""
llm = ChatOpenAI(model="claude-sonnet-4-5") # 使用高级模型
response = llm.invoke(f"深度研究和分析:{state['query']}")
return {"answer": response.content}
# 路由函数
def route_by_complexity(state: ConditionalState) -> str:
"""根据复杂度路由"""
complexity_map = {
"simple": "simple_handler",
"medium": "medium_handler",
"complex": "complex_handler"
}
return complexity_map[state["complexity"]]
# 构建条件工作流
workflow = StateGraph(ConditionalState)
# 添加节点
workflow.add_node("assess", assess_complexity)
workflow.add_node("simple_handler", simple_handler)
workflow.add_node("medium_handler", medium_handler)
workflow.add_node("complex_handler", complex_handler)
# 流程
workflow.add_edge(START, "assess")
# 条件路由
workflow.add_conditional_edges(
"assess",
route_by_complexity,
{
"simple_handler": "simple_handler",
"medium_handler": "medium_handler",
"complex_handler": "complex_handler"
}
)
# 所有路径结束
workflow.add_edge("simple_handler", END)
workflow.add_edge("medium_handler", END)
workflow.add_edge("complex_handler", END)
app = workflow.compile()
# 测试
test_queries = [
"什么是 Python?",
"对比 Python 和 Java 的优缺点",
"设计一个高并发的微服务架构"
]
for query in test_queries:
result = app.invoke({"query": query})
print(f"查询:{query}")
print(f"复杂度:{result['complexity']}")
print(f"答案:{result['answer'][:100]}...\n")
# 执行流程:
# ┌→ simple_handler (gpt-4o-mini)
# 查询 → 评估 ─┼→ medium_handler (gpt-4o)
# └→ complex_handler (claude-sonnet-4-5)
# 优势:
# - 成本优化(简单查询用便宜模型)
# - 质量保证(复杂查询用高级模型)
# - 自动决策
模式 4:循环迭代
"""
模式 4:循环迭代 - 质量驱动的内容优化
"""
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
# 定义状态
class IterativeState(TypedDict):
topic: str
draft: str
quality_score: float
iteration_count: int
max_iterations: int
feedback: str
final_content: str
# 质量评估结构
class QualityAssessment(BaseModel):
score: float = Field(description="质量评分 (0-1)", ge=0, le=1)
feedback: str = Field(description="改进建议")
pass_quality: bool = Field(description="是否达到质量标准")
# 内容生成节点
def generate_content(state: IterativeState):
"""生成或改进内容"""
llm = ChatOpenAI(model="claude-sonnet-4-5")
if state.get("draft"):
# 改进现有内容
prompt = f"""改进以下内容:
主题:{state['topic']}
当前版本:{state['draft']}
改进建议:{state['feedback']}
要求:根据反馈改进内容。
"""
else:
# 首次生成
prompt = f"""创作关于以下主题的内容:
主题:{state['topic']}
要求:高质量、结构清晰、内容丰富。
"""
response = llm.invoke(prompt)
return {
"draft": response.content,
"iteration_count": state.get("iteration_count", 0) + 1
}
# 质量检查节点
def quality_check(state: IterativeState):
"""检查内容质量"""
llm = ChatOpenAI(model="gpt-4o")
llm_with_structure = llm.with_structured_output(QualityAssessment)
prompt = f"""评估内容质量:
主题:{state['topic']}
内容:{state['draft']}
评估标准:
- 内容相关性和准确性
- 结构清晰度
- 语言流畅性
- 信息完整性
质量阈值:0.8(低于需改进)
返回评分、是否达标、改进建议。
"""
assessment = llm_with_structure.invoke(prompt)
return {
"quality_score": assessment.score,
"feedback": assessment.feedback
}
# 决策函数
def should_continue(state: IterativeState) -> Literal["improve", "finish"]:
"""决定是否继续优化"""
# 达到最大迭代次数
if state["iteration_count"] >= state["max_iterations"]:
return "finish"
# 质量达标
if state["quality_score"] >= 0.8:
return "finish"
# 继续优化
return "improve"
# 完成节点
def finalize(state: IterativeState):
"""最终化内容"""
return {
"final_content": state["draft"]
}
# 构建循环工作流
workflow = StateGraph(IterativeState)
# 添加节点
workflow.add_node("generate", generate_content)
workflow.add_node("quality_check", quality_check)
workflow.add_node("finalize", finalize)
# 流程
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "quality_check")
# 条件循环
workflow.add_conditional_edges(
"quality_check",
should_continue,
{
"improve": "generate", # 循环回去改进
"finish": "finalize"
}
)
workflow.add_edge("finalize", END)
app = workflow.compile()
# 使用
result = app.invoke({
"topic": "AI 在医疗领域的应用",
"max_iterations": 3
})
print(f"主题:{result['topic']}")
print(f"迭代次数:{result['iteration_count']}")
print(f"最终质量评分:{result['quality_score']}")
print(f"最终内容:{result['final_content'][:200]}...")
# 执行流程:
# 生成 → 质量检查 → (不达标?) → 生成 → 质量检查 → (达标) → 完成
# 优势:
# - 自动质量控制
# - 迭代优化
# - 防止无限循环(最大次数限制)
🎯 完整实战示例
构建一个智能内容审核和发布系统,综合使用所有工作流模式和 Multi-Agent 协作模式, 实现从内容接收到发布的完整自动化流程。
"""
完整实战示例 - 智能内容审核和发布系统
综合使用:Router + Subagents + Handoffs + Skills + 四种工作流模式
"""
import uuid
from typing import TypedDict, Annotated, Literal
from enum import Enum
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain.tools import tool
# ========== 1. 定义枚举和数据结构 ==========
class ContentType(str, Enum):
ARTICLE = "article"
VIDEO = "video"
IMAGE = "image"
class ReviewStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
NEEDS_REVISION = "needs_revision"
class ContentQuality(BaseModel):
score: float = Field(ge=0, le=1)
issues: list[str]
suggestions: list[str]
pass_review: bool
# ========== 2. 定义状态 ==========
class ContentWorkflowState(TypedDict):
# 输入
content_id: str
content_type: ContentType
raw_content: str
submitter: str
# Router 阶段
route_to: str
# 预处理阶段
preprocessed_content: str
# 审核阶段
quality_assessment: ContentQuality
review_status: ReviewStatus
revision_count: int
# 并行分析
seo_score: float
sentiment: str
compliance_passed: bool
# 最终输出
final_content: str
published: bool
publish_url: str
# ========== 3. Skills 定义 ==========
@tool
def check_content_compliance(content: str) -> dict:
"""检查内容合规性"""
# 实际会检查敏感词、版权等
return {
"passed": True,
"issues": []
}
@tool
def analyze_sentiment(content: str) -> str:
"""分析内容情绪"""
# 实际会调用情感分析 API
return "positive"
@tool
def calculate_seo_score(content: str) -> float:
"""计算 SEO 评分"""
# 实际会分析关键词密度、可读性等
return 0.85
@tool
def publish_to_cms(content: str, metadata: dict) -> dict:
"""发布到 CMS"""
# 实际会调用 CMS API
return {
"success": True,
"url": f"https://example.com/content/{uuid.uuid4().hex}"
}
# ========== 4. 节点定义 ==========
# Router 节点:内容分类
def content_router(state: ContentWorkflowState):
"""根据内容类型路由到不同处理流程"""
content_type = state["content_type"]
route_map = {
ContentType.ARTICLE: "article_pipeline",
ContentType.VIDEO: "video_pipeline",
ContentType.IMAGE: "image_pipeline"
}
return {
"route_to": route_map[content_type]
}
# 预处理节点
def preprocess_content(state: ContentWorkflowState):
"""预处理内容(确定性逻辑)"""
raw = state["raw_content"]
# 清理、格式化
preprocessed = raw.strip()
# 实际会做更多处理:HTML 清理、格式标准化等
return {
"preprocessed_content": preprocessed
}
# 质量评估节点(Subagent)
def quality_assessment_agent(state: ContentWorkflowState):
"""使用 Agent 评估内容质量"""
llm = ChatAnthropic(model="claude-sonnet-4-5")
llm_with_structure = llm.with_structured_output(ContentQuality)
prompt = f"""全面评估内容质量:
内容类型:{state['content_type'].value}
内容:{state['preprocessed_content'][:500]}...
评估维度:
- 内容相关性和准确性
- 语言质量和流畅性
- 结构清晰度
- 信息价值
- 是否存在明显问题
质量阈值:0.7(低于需修改)
返回评分、问题列表、改进建议、是否通过。
"""
assessment = llm_with_structure.invoke(prompt)
# 确定审核状态
if assessment.pass_review:
status = ReviewStatus.APPROVED
else:
status = ReviewStatus.NEEDS_REVISION
return {
"quality_assessment": assessment,
"review_status": status
}
# 内容改进节点(循环优化)
def content_improvement_agent(state: ContentWorkflowState):
"""根据反馈改进内容"""
llm = ChatAnthropic(model="claude-sonnet-4-5")
quality = state["quality_assessment"]
issues = ", ".join(quality.issues)
suggestions = ", ".join(quality.suggestions)
prompt = f"""改进内容:
原内容:{state['preprocessed_content']}
问题:{issues}
建议:{suggestions}
要求:根据反馈改进内容,保持原意。
"""
response = llm.invoke(prompt)
return {
"preprocessed_content": response.content,
"revision_count": state.get("revision_count", 0) + 1
}
# 并行分析节点 1:SEO 分析
def seo_analysis(state: ContentWorkflowState):
"""SEO 分析"""
score = calculate_seo_score.invoke({"content": state["preprocessed_content"]})
return {"seo_score": score}
# 并行分析节点 2:情感分析
def sentiment_analysis(state: ContentWorkflowState):
"""情感分析"""
sentiment = analyze_sentiment.invoke({"content": state["preprocessed_content"]})
return {"sentiment": sentiment}
# 并行分析节点 3:合规检查
def compliance_check(state: ContentWorkflowState):
"""合规性检查"""
result = check_content_compliance.invoke({"content": state["preprocessed_content"]})
return {"compliance_passed": result["passed"]}
# Handoff 决策:是否需要人工审核
def need_human_review(state: ContentWorkflowState) -> Literal["human_review", "auto_publish"]:
"""决定是否需要人工审核"""
quality = state["quality_assessment"]
# 低质量或不合规需要人工审核
if quality.score < 0.6 or not state["compliance_passed"]:
return "human_review"
return "auto_publish"
# 人工审核节点(模拟)
def human_review_agent(state: ContentWorkflowState):
"""人工审核(实际会等待人工决策)"""
# 模拟人工批准
return {
"review_status": ReviewStatus.APPROVED,
"final_content": state["preprocessed_content"]
}
# 发布节点
def publish_content(state: ContentWorkflowState):
"""发布内容"""
result = publish_to_cms.invoke({
"content": state.get("final_content", state["preprocessed_content"]),
"metadata": {
"content_type": state["content_type"].value,
"submitter": state["submitter"],
"seo_score": state["seo_score"]
}
})
return {
"published": result["success"],
"publish_url": result["url"]
}
# ========== 5. 条件路由函数 ==========
def route_by_type(state: ContentWorkflowState) -> str:
"""根据内容类型路由"""
return state["route_to"]
def should_revise(state: ContentWorkflowState) -> Literal["revise", "parallel_analysis"]:
"""决定是否需要修改"""
if state["review_status"] == ReviewStatus.NEEDS_REVISION:
# 检查是否超过最大修改次数
if state.get("revision_count", 0) >= 2:
return "parallel_analysis" # 强制进入下一阶段
return "revise"
return "parallel_analysis"
# ========== 6. 构建综合工作流 ==========
workflow = StateGraph(ContentWorkflowState)
# ===== 第一阶段:Router =====
workflow.add_node("router", content_router)
# ===== 第二阶段:预处理(确定性) =====
workflow.add_node("preprocess", preprocess_content)
# ===== 第三阶段:质量评估(Agent) =====
workflow.add_node("quality_assessment", quality_assessment_agent)
# ===== 第四阶段:内容改进(循环) =====
workflow.add_node("improve", content_improvement_agent)
# ===== 第五阶段:并行分析 =====
workflow.add_node("seo_analysis", seo_analysis)
workflow.add_node("sentiment_analysis", sentiment_analysis)
workflow.add_node("compliance_check", compliance_check)
# ===== 第六阶段:Handoff(人工审核) =====
workflow.add_node("human_review", human_review_agent)
# ===== 第七阶段:发布 =====
workflow.add_node("publish", publish_content)
# ===== 连接流程 =====
# START → router
workflow.add_edge(START, "router")
# router → preprocess(所有类型都经过预处理)
workflow.add_conditional_edges(
"router",
route_by_type,
{
"article_pipeline": "preprocess",
"video_pipeline": "preprocess",
"image_pipeline": "preprocess"
}
)
# preprocess → quality_assessment
workflow.add_edge("preprocess", "quality_assessment")
# quality_assessment → 条件分支(修改或并行分析)
workflow.add_conditional_edges(
"quality_assessment",
should_revise,
{
"revise": "improve",
"parallel_analysis": "seo_analysis"
}
)
# improve → quality_assessment(循环)
workflow.add_edge("improve", "quality_assessment")
# 并行分析(三个节点同时执行)
workflow.add_edge("seo_analysis", "human_review")
workflow.add_edge("sentiment_analysis", "human_review")
workflow.add_edge("compliance_check", "human_review")
# human_review → 条件分支(人工审核或自动发布)
workflow.add_conditional_edges(
"human_review",
need_human_review,
{
"human_review": "publish", # 已经人工审核,直接发布
"auto_publish": "publish"
}
)
# publish → END
workflow.add_edge("publish", END)
# 编译
app = workflow.compile()
# ========== 7. 测试系统 ==========
test_content = {
"content_id": str(uuid.uuid4()),
"content_type": ContentType.ARTICLE,
"raw_content": """
人工智能正在改变世界。随着技术的发展,AI 应用越来越广泛。
从医疗到金融,从教育到娱乐,AI 无处不在。
""",
"submitter": "user_001"
}
result = app.invoke(test_content)
print("=" * 60)
print("内容审核和发布系统执行结果")
print("=" * 60)
print(f"内容 ID:{result['content_id']}")
print(f"内容类型:{result['content_type'].value}")
print(f"提交者:{result['submitter']}")
print(f"\n审核结果:")
print(f" - 质量评分:{result['quality_assessment'].score}")
print(f" - 审核状态:{result['review_status'].value}")
print(f" - 修改次数:{result.get('revision_count', 0)}")
print(f"\n并行分析:")
print(f" - SEO 评分:{result['seo_score']}")
print(f" - 情感倾向:{result['sentiment']}")
print(f" - 合规检查:{'通过' if result['compliance_passed'] else '未通过'}")
print(f"\n发布状态:")
print(f" - 是否发布:{result['published']}")
print(f" - 发布 URL:{result['publish_url']}")
# 工作流执行过程:
# 1. Router:根据内容类型路由
# 2. 预处理:清理和格式化(确定性)
# 3. 质量评估:Agent 智能评估
# 4. 条件循环:质量不达标时改进(最多 2 次)
# 5. 并行分析:SEO + 情感 + 合规(同时进行)
# 6. Handoff 决策:低质量转人工审核
# 7. 发布:发布到 CMS
# 优势:
# - 综合使用所有 Multi-Agent 模式
# - 混合确定性逻辑和 AI 决策
# - 自动化质量控制和优化
# - 灵活的人工介入机制
# - 生产级错误处理和追踪
✨ 自定义工作流最佳实践
1. 工作流设计原则
- 关注点分离:每个节点承担明确的单一职责
- 确定性优先:能用确定性逻辑就不用 LLM(提高可靠性和速度)
- 类型化状态:使用 TypedDict 定义清晰的状态结构
- 模块化设计:节点可独立测试和复用
- 渐进式复杂度:从简单顺序流程开始,逐步添加复杂性
- 错误边界:为关键节点添加错误处理和降级
2. 节点类型选择
| 任务类型 | 推荐节点类型 | 示例 |
|---|---|---|
| 数据获取 | 确定性函数 | 数据库查询、API 调用 |
| 数据转换 | 确定性函数 | 格式转换、清洗 |
| 内容理解 | LLM 节点 | 分类、摘要、情感分析 |
| 内容生成 | LLM 节点 | 写作、改写、创作 |
| 复杂推理 | Agent 节点 | 多步决策、工具调用 |
| 验证检查 | 确定性函数 | 格式校验、规则检查 |
3. 性能优化策略
- 并行化:识别可并行的独立任务
- 缓存:缓存昂贵的 LLM 调用结果
- 早停:条件提前终止,跳过不必要的步骤
- 批处理:将多个请求批量处理
- 模型选择:根据任务复杂度选择模型(简单任务用 mini)
- 流式输出:对长时间任务使用流式响应
4. 状态管理最佳实践
# 好的状态设计
class WellDesignedState(TypedDict):
# 输入字段(清晰标注)
user_query: str
user_id: str
# 中间结果(按阶段组织)
stage1_result: str
stage2_analysis: dict
# 元数据(追踪信息)
processing_time_ms: float
node_history: list[str]
# 输出字段
final_answer: str
confidence: float
# 差的状态设计
class PoorlyDesignedState(TypedDict):
# 字段命名不清晰
data: str
result: str
temp: str
# 缺少类型提示
analysis # 类型不明确
# 职责混乱
everything: dict # 所有数据混在一起
5. 错误处理和监控
# 节点错误处理
def robust_node(state):
"""带错误处理的节点"""
try:
# 主要逻辑
result = expensive_operation(state)
return {"result": result}
except SpecificError as e:
# 降级处理
logger.warning(f"Operation failed, using fallback: {e}")
return {"result": fallback_operation(state)}
except Exception as e:
# 记录错误并传递
logger.error(f"Node failed: {e}")
return {"error": str(e), "failed_node": "robust_node"}
# 工作流级别监控
def add_monitoring(workflow):
"""为工作流添加监控"""
def log_node_entry(node_name):
print(f"Entering node: {node_name}")
def log_node_exit(node_name, duration_ms):
print(f"Exiting node: {node_name} ({duration_ms}ms)")
# 使用 LangSmith 追踪
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "custom-workflow"
❓ 常见问题
Q1: 何时使用自定义工作流而非标准模式?
决策树:
- 需求是否符合 Subagents(顺序流水线)?→ 是:使用 Subagents
- 是否需要运行时动态转移?→ 是:使用 Handoffs
- 是否只需初始分类路由?→ 是:使用 Router
- 是否只需共享工具能力?→ 是:使用 Skills
- 以上都不完全符合?→ 使用自定义工作流
Q2: 如何组合多个 Multi-Agent 模式?
组合策略:
- Router 作为入口:自定义工作流的第一个节点可以是 Router
- Subagents 作为节点:将完整的 Subagent 流程作为单个节点
- Skills 作为工具:所有代理节点使用共享的 Skills
- Handoffs 作为决策:使用条件边实现 Handoff 逻辑
Q3: 如何避免工作流过于复杂?
复杂度管理:
- 分层设计:将复杂工作流拆分为多个子工作流
- 限制深度:避免超过 3 层嵌套
- 可视化:使用 Mermaid 图表记录工作流
- 代码审查:定期审查工作流设计
- 重构:当工作流超过 10 个节点时考虑重构
Q4: 如何测试自定义工作流?
测试策略:
# 单元测试:测试单个节点
def test_quality_assessment_node():
state = {
"preprocessed_content": "测试内容"
}
result = quality_assessment_agent(state)
assert "quality_assessment" in result
assert 0 <= result["quality_assessment"].score <= 1
# 集成测试:测试完整路径
def test_full_workflow():
input_state = {
"content_type": ContentType.ARTICLE,
"raw_content": "测试文章内容"
}
result = app.invoke(input_state)
assert result["published"] == True
# 路径测试:测试所有条件分支
def test_revision_path():
# 测试触发修改的路径
pass
def test_human_review_path():
# 测试触发人工审核的路径
pass
Q5: 如何调试工作流执行?
调试技巧:
- LangSmith 追踪:查看完整执行链和每个节点的输入输出
- 打印状态:在关键节点打印状态变化
- 日志记录:为每个节点添加结构化日志
- 单步执行:独立测试每个节点
- 可视化执行:生成工作流执行的时序图
Q6: 如何优化工作流性能?
性能优化清单:
- ✅ 识别可并行的节点并并行执行
- ✅ 缓存昂贵的 LLM 调用
- ✅ 对简单任务使用便宜模型(gpt-4o-mini)
- ✅ 添加早停条件,跳过不必要的节点
- ✅ 批量处理多个请求
- ✅ 使用异步执行(async/await)
- ✅ 监控每个节点的延迟,优化瓶颈