📊 LangGraph 状态图与工作流
掌握 LangGraph 低级编排框架,构建复杂的有状态 Agent 工作流, 实现持久化执行、人工干预和长期运行的智能系统。
📚 什么是 LangGraph?
LangGraph 是由 LangChain Inc 开发的低级编排框架和运行时, 专门用于构建、管理和部署长期运行的有状态 Agent。 它被 Klarna、Replit 和 Elastic 等公司采用于生产环境。
LangGraph 专注于代理编排而非提示或架构抽象。 它提供了对 Agent 工作流的精细控制,适合需要复杂任务处理的应用。
- vs create_agent():create_agent() 是高级抽象,自动生成工作流;LangGraph 需要手动定义每个节点和边
- 灵活性:LangGraph 提供更大的灵活性,但需要更多代码
- 适用场景:复杂的多步骤工作流、需要精确控制执行流程的应用
五大核心优势
| 优势 | 说明 | 应用场景 |
|---|---|---|
| 持久化执行 | Agent 可在故障中恢复,从中断点继续 | 长时间运行的任务、需要容错的系统 |
| 人工干预 | 在任何点检查和修改 Agent 状态 | 需要审批的操作、调试复杂流程 |
| 综合记忆 | 支持短期工作记忆和跨会话长期记忆 | 多轮对话、用户偏好管理 |
| LangSmith 调试 | 可视化工具追踪执行路径和状态转换 | 开发调试、性能优化 |
| 生产就绪部署 | 可扩展基础设施支持有状态工作流 | 企业级应用、大规模部署 |
🏗️ LangGraph 核心架构
三大核心组件
| 组件 | 作用 | 类比 |
|---|---|---|
| State(状态) | 共享数据结构,表示应用的当前快照 | 数据库,存储所有信息 |
| Nodes(节点) | 编码 Agent 逻辑的函数,执行计算并返回更新 | 工人,完成具体任务 |
| Edges(边) | 确定下一个执行节点的函数 | 路标,指明下一步方向 |
"Nodes do the work, edges tell what to do next."
节点负责执行具体工作,边负责决策下一步。
🎯 StateGraph 基础使用
StateGraph 是 LangGraph 的主要图实现,
通过用户定义的 State 对象进行参数化。
最简单的示例
"""
StateGraph 最简示例
功能:创建一个简单的状态图
"""
from langgraph.graph import StateGraph, MessagesState, START, END
def mock_llm(state: MessagesState):
"""模拟 LLM 节点"""
return {"messages": [{"role": "ai", "content": "hello world"}]}
# 创建状态图
graph = StateGraph(MessagesState)
# 添加节点
graph.add_node("mock_llm", mock_llm)
# 添加边
graph.add_edge(START, "mock_llm") # 从 START 到 mock_llm
graph.add_edge("mock_llm", END) # 从 mock_llm 到 END
# 编译图
compiled_graph = graph.compile()
# 执行图
result = compiled_graph.invoke({
"messages": [{"role": "user", "content": "hi!"}]
})
print(result["messages"][-1]["content"]) # 输出:hello world
定义自定义状态
"""
自定义状态示例
功能:定义包含多个字段的状态
"""
from typing_extensions import TypedDict
from typing import Annotated
from operator import add
class CustomState(TypedDict):
# 简单字段(覆盖模式)
current_step: str
result: str
# 列表字段(追加模式)
messages: Annotated[list, add] # 使用 add reducer 追加元素
# 计数器(加法模式)
step_count: Annotated[int, lambda x, y: x + y]
# 使用自定义状态创建图
graph = StateGraph(CustomState)
- 默认行为:覆盖(后面的值替换前面的值)
- Reducer 函数:使用
Annotated[type, reducer]定义合并逻辑 - 常用 Reducer:
add- 列表追加lambda x, y: x + y- 数值相加- 自定义函数 - 任意合并逻辑
🔗 节点和边详解
添加节点
"""
节点定义示例
功能:创建多个节点处理不同任务
"""
from langgraph.graph import StateGraph, MessagesState
def input_processor(state: MessagesState):
"""输入处理节点"""
user_message = state["messages"][-1]["content"]
return {"messages": [{"role": "system", "content": f"处理输入: {user_message}"}]}
def llm_caller(state: MessagesState):
"""LLM 调用节点"""
# 实际应用中这里会调用真实的 LLM
return {"messages": [{"role": "ai", "content": "这是 AI 的回答"}]}
def output_formatter(state: MessagesState):
"""输出格式化节点"""
ai_message = state["messages"][-1]["content"]
formatted = f"【AI回复】{ai_message}"
return {"messages": [{"role": "ai", "content": formatted}]}
# 创建图并添加节点
graph = StateGraph(MessagesState)
graph.add_node("input_processor", input_processor)
graph.add_node("llm_caller", llm_caller)
graph.add_node("output_formatter", output_formatter)
普通边(Normal Edges)
"""
普通边示例
功能:定义固定的执行顺序
"""
from langgraph.graph import START, END
# 定义执行流程
graph.add_edge(START, "input_processor") # 开始 -> 输入处理
graph.add_edge("input_processor", "llm_caller") # 输入处理 -> LLM调用
graph.add_edge("llm_caller", "output_formatter") # LLM调用 -> 输出格式化
graph.add_edge("output_formatter", END) # 输出格式化 -> 结束
# 编译和执行
compiled_graph = graph.compile()
result = compiled_graph.invoke({
"messages": [{"role": "user", "content": "你好"}]
})
条件边(Conditional Edges)
"""
条件边示例
功能:根据状态动态选择下一个节点
"""
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import Literal
class CustomState(MessagesState):
intent: str # 用户意图
def classify_intent(state: CustomState):
"""意图分类节点"""
user_message = state["messages"][-1]["content"]
# 简单的意图识别
if "天气" in user_message:
intent = "weather"
elif "订单" in user_message:
intent = "order"
else:
intent = "general"
return {"intent": intent}
def handle_weather(state: CustomState):
"""处理天气查询"""
return {"messages": [{"role": "ai", "content": "今天天气晴朗"}]}
def handle_order(state: CustomState):
"""处理订单查询"""
return {"messages": [{"role": "ai", "content": "您的订单正在配送中"}]}
def handle_general(state: CustomState):
"""处理一般问题"""
return {"messages": [{"role": "ai", "content": "我会帮您解答"}]}
# 路由函数:根据意图选择下一个节点
def route_by_intent(state: CustomState) -> Literal["weather", "order", "general"]:
"""路由函数"""
return state["intent"]
# 创建图
graph = StateGraph(CustomState)
# 添加节点
graph.add_node("classifier", classify_intent)
graph.add_node("weather", handle_weather)
graph.add_node("order", handle_order)
graph.add_node("general", handle_general)
# 添加边
graph.add_edge(START, "classifier")
# 添加条件边
graph.add_conditional_edges(
"classifier", # 源节点
route_by_intent, # 路由函数
{ # 映射关系
"weather": "weather",
"order": "order",
"general": "general"
}
)
# 所有处理节点都指向 END
graph.add_edge("weather", END)
graph.add_edge("order", END)
graph.add_edge("general", END)
# 编译和测试
compiled_graph = graph.compile()
# 测试天气查询
result1 = compiled_graph.invoke({
"messages": [{"role": "user", "content": "今天天气怎么样?"}]
})
print(result1["messages"][-1]["content"]) # 输出:今天天气晴朗
# 测试订单查询
result2 = compiled_graph.invoke({
"messages": [{"role": "user", "content": "我的订单在哪里?"}]
})
print(result2["messages"][-1]["content"]) # 输出:您的订单正在配送中
🎮 Command 控制流
Command 允许节点同时返回状态更新和路由决策,
提供更精细的控制流管理。
"""
Command 控制流示例
功能:节点返回更新和路由指令
"""
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
from typing import Literal
class TaskState(MessagesState):
max_retries: int
retry_count: int
success: bool
def process_task(state: TaskState) -> Command[Literal["retry", "success", "failed"]]:
"""任务处理节点"""
import random
# 模拟任务执行(70% 成功率)
success = random.random() > 0.3
if success:
# 成功:更新状态并路由到 success 节点
return Command(
update={"success": True},
goto="success"
)
else:
# 失败:检查重试次数
retry_count = state.get("retry_count", 0) + 1
if retry_count < state.get("max_retries", 3):
# 可以重试:更新重试计数并路由回当前节点
return Command(
update={"retry_count": retry_count},
goto="retry"
)
else:
# 超过重试次数:标记为失败
return Command(
update={"success": False},
goto="failed"
)
def handle_retry(state: TaskState):
"""重试处理"""
return {"messages": [{"role": "system", "content": f"第 {state['retry_count']} 次重试..."}]}
def handle_success(state: TaskState):
"""成功处理"""
return {"messages": [{"role": "ai", "content": "任务成功完成!"}]}
def handle_failed(state: TaskState):
"""失败处理"""
return {"messages": [{"role": "ai", "content": "任务失败,已达最大重试次数"}]}
# 创建图
graph = StateGraph(TaskState)
graph.add_node("task", process_task)
graph.add_node("retry", handle_retry)
graph.add_node("success", handle_success)
graph.add_node("failed", handle_failed)
graph.add_edge(START, "task")
graph.add_edge("retry", "task") # 重试回到任务节点
graph.add_edge("success", END)
graph.add_edge("failed", END)
compiled_graph = graph.compile()
# 测试
result = compiled_graph.invoke({
"messages": [],
"max_retries": 3,
"retry_count": 0,
"success": False
})
print(f"最终结果: {result['messages'][-1]['content']}")
print(f"重试次数: {result.get('retry_count', 0)}")
📨 MessagesState 预构建状态
MessagesState 是 LangGraph 提供的预构建状态,
专门用于处理消息列表,自动使用 add_messages reducer。
"""
MessagesState 使用示例
功能:扩展 MessagesState 添加自定义字段
"""
from langgraph.graph import MessagesState, StateGraph, START, END
# 扩展 MessagesState
class ExtendedState(MessagesState):
documents: list[str] # 添加文档列表字段
user_id: str # 添加用户 ID 字段
def retrieve_docs(state: ExtendedState):
"""检索文档节点"""
query = state["messages"][-1]["content"]
# 模拟文档检索
docs = [f"文档1关于{query}", f"文档2关于{query}"]
return {"documents": docs}
def generate_answer(state: ExtendedState):
"""生成答案节点"""
docs = state.get("documents", [])
context = "\\n".join(docs)
answer = f"基于以下文档:\\n{context}\\n\\n我的回答是..."
return {"messages": [{"role": "ai", "content": answer}]}
# 创建图
graph = StateGraph(ExtendedState)
graph.add_node("retrieve", retrieve_docs)
graph.add_node("generate", generate_answer)
graph.add_edge(START, "retrieve")
graph.add_edge("retrieve", "generate")
graph.add_edge("generate", END)
compiled_graph = graph.compile()
# 测试
result = compiled_graph.invoke({
"messages": [{"role": "user", "content": "LangChain"}],
"user_id": "user_123"
})
print(result["messages"][-1]["content"])
🎯 完整示例:智能客服路由系统
"""
智能客服路由系统完整示例
功能:根据用户问题类型,路由到不同的处理节点
"""
from langgraph.graph import StateGraph, MessagesState, START, END
from typing import Literal
from typing_extensions import TypedDict
# ==================== 状态定义 ====================
class CustomerServiceState(MessagesState):
intent: str # 用户意图
sentiment: str # 情感分析结果
priority: int # 优先级
resolved: bool # 是否已解决
# ==================== 节点函数 ====================
def analyze_query(state: CustomerServiceState):
"""分析用户查询"""
user_message = state["messages"][-1]["content"]
# 简单的意图识别
if any(word in user_message for word in ["订单", "快递", "物流"]):
intent = "order"
priority = 2
elif any(word in user_message for word in ["退款", "退货", "投诉"]):
intent = "refund"
priority = 3 # 高优先级
elif any(word in user_message for word in ["产品", "使用", "教程"]):
intent = "product"
priority = 1
else:
intent = "general"
priority = 1
# 简单的情感分析
if any(word in user_message for word in ["生气", "不满", "失望"]):
sentiment = "negative"
priority = max(priority, 3) # 负面情绪提升优先级
else:
sentiment = "neutral"
return {
"intent": intent,
"sentiment": sentiment,
"priority": priority
}
def handle_order_query(state: CustomerServiceState):
"""处理订单查询"""
return {
"messages": [{
"role": "ai",
"content": "您的订单正在处理中。订单号:ORDER-12345,预计3天内送达。"
}],
"resolved": True
}
def handle_refund_request(state: CustomerServiceState):
"""处理退款请求"""
if state.get("priority", 1) >= 3:
# 高优先级,转人工
return {
"messages": [{
"role": "ai",
"content": "您的退款请求已收到。由于您的情况比较特殊,我已为您转接人工客服,请稍候..."
}],
"resolved": False # 需要人工介入
}
else:
return {
"messages": [{
"role": "ai",
"content": "退款已提交,预计3-5个工作日到账。退款单号:REFUND-67890"
}],
"resolved": True
}
def handle_product_inquiry(state: CustomerServiceState):
"""处理产品咨询"""
return {
"messages": [{
"role": "ai",
"content": "关于产品使用,您可以参考我们的在线教程:https://example.com/tutorial"
}],
"resolved": True
}
def handle_general_question(state: CustomerServiceState):
"""处理一般问题"""
return {
"messages": [{
"role": "ai",
"content": "感谢您的咨询。我会尽力帮助您。请问有什么具体问题吗?"
}],
"resolved": True
}
def escalate_to_human(state: CustomerServiceState):
"""升级到人工客服"""
return {
"messages": [{
"role": "system",
"content": f"【系统】优先级 {state['priority']} - 已转接人工客服"
}]
}
# ==================== 路由函数 ====================
def route_by_intent(
state: CustomerServiceState
) -> Literal["order", "refund", "product", "general"]:
"""根据意图路由"""
return state["intent"]
def check_resolution(
state: CustomerServiceState
) -> Literal["escalate", "complete"]:
"""检查是否已解决"""
if state.get("resolved", False):
return "complete"
else:
return "escalate"
# ==================== 构建状态图 ====================
graph = StateGraph(CustomerServiceState)
# 添加节点
graph.add_node("analyzer", analyze_query)
graph.add_node("order", handle_order_query)
graph.add_node("refund", handle_refund_request)
graph.add_node("product", handle_product_inquiry)
graph.add_node("general", handle_general_question)
graph.add_node("escalate", escalate_to_human)
# 添加边
graph.add_edge(START, "analyzer")
# 第一层路由:按意图分发
graph.add_conditional_edges(
"analyzer",
route_by_intent,
{
"order": "order",
"refund": "refund",
"product": "product",
"general": "general"
}
)
# 第二层路由:检查是否需要升级
for node in ["order", "refund", "product", "general"]:
graph.add_conditional_edges(
node,
check_resolution,
{
"complete": END,
"escalate": "escalate"
}
)
graph.add_edge("escalate", END)
# 编译图
customer_service_graph = graph.compile()
# ==================== 测试场景 ====================
def test_query(query: str):
"""测试查询"""
print(f"\n{'='*60}")
print(f"用户: {query}")
print('='*60)
result = customer_service_graph.invoke({
"messages": [{"role": "user", "content": query}]
})
print(f"\n意图: {result.get('intent', 'N/A')}")
print(f"情感: {result.get('sentiment', 'N/A')}")
print(f"优先级: {result.get('priority', 'N/A')}")
print(f"已解决: {result.get('resolved', 'N/A')}")
for msg in result["messages"][1:]: # 跳过用户消息
print(f"\n{msg['role'].upper()}: {msg['content']}")
print()
if __name__ == "__main__":
# 测试不同类型的查询
test_query("我的订单什么时候到?")
test_query("我要退款!这个产品太差了,我很生气!")
test_query("这个产品怎么使用?")
test_query("你们营业时间是什么?")
❓ 常见问题
Q1: LangGraph 和 create_agent() 什么时候用哪个?
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 简单 Agent | create_agent() | 自动生成工作流,代码简洁 |
| 复杂分支逻辑 | LangGraph | 精确控制执行路径 |
| 需要可视化 | LangGraph | 支持 LangSmith 图形化调试 |
| 快速原型 | create_agent() | 开发速度快 |
| 生产级复杂系统 | LangGraph | 更好的可维护性和可扩展性 |
Q2: 如何调试 LangGraph?
使用 LangSmith 进行可视化调试:
import os
# 配置 LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
# 运行图,LangSmith 会自动追踪
result = graph.invoke({...})
Q3: 图可以有循环吗?
可以。LangGraph 支持循环,但需要设置递归限制防止无限循环:
# 编译时设置递归限制
graph = builder.compile(
checkpointer=checkpointer,
recursion_limit=25 # 最多执行 25 步
)
Q4: 如何处理长时间运行的任务?
使用 Checkpointer 实现持久化:
from langgraph.checkpoint.memory import InMemorySaver
# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译时传入
graph = builder.compile(checkpointer=checkpointer)
# 使用 thread_id 标识会话
config = {"configurable": {"thread_id": "thread_001"}}
# 即使中断,也可以从断点恢复
result = graph.invoke({...}, config=config)
Q5: 节点可以返回什么类型?
节点可以返回:
- 字典:状态更新
{"key": "value"} - Command:状态更新 + 路由决策
- None:不更新状态(仅执行副作用)
✨ 最佳实践
1. 状态设计原则
- 最小化:只存储必要的信息
- 类型安全:使用 TypedDict 或 Pydantic 模型
- 合适的 Reducer:为列表字段使用
addreducer - 避免嵌套:保持状态结构扁平化
2. 节点设计原则
- 单一职责:每个节点只做一件事
- 无副作用:除非必要,节点应该是纯函数
- 错误处理:在节点内捕获和处理异常
- 可测试:节点应该易于单元测试
3. 边和路由
- 清晰的路由逻辑:路由函数应该简单明了
- 使用 Literal 类型:为路由返回值添加类型提示
- 避免过深嵌套:超过 3 层条件时考虑重构
- 设置递归限制:防止无限循环
4. 生产环境配置
# 生产环境推荐配置
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:pass@localhost:5432/langchain"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
graph = builder.compile(
checkpointer=checkpointer,
recursion_limit=50,
debug=False # 生产环境关闭调试
)