🌊 Streaming 流式输出
掌握 LangChain 1.0 的流式输出机制,实现实时反馈和进度更新, 大幅提升 AI Agent 的用户体验。
📚 什么是 Streaming?
Streaming(流式输出)允许 Agent 在执行过程中实时发送更新, 而不是等待整个任务完成后才返回结果。这显著改善了用户体验,特别是处理耗时任务时。
- 实时反馈:用户立即看到 Agent 的思考过程和进度
- 降低感知延迟:即使总时间相同,流式输出让用户感觉更快
- 早期错误发现:可以在执行中途发现问题并中断
- 更好的交互性:支持进度条、日志显示等 UI 元素
对比:阻塞式 vs 流式
| 特性 | 阻塞式调用 (invoke) | 流式调用 (stream) |
|---|---|---|
| 返回时机 | 任务完成后 | 逐步返回 |
| 用户体验 | 等待期间无反馈 | 实时看到进度 |
| 适用场景 | 简单、快速任务 | 复杂、耗时任务 |
| 实现难度 | 简单 | 稍复杂 |
🏗️ Streaming 架构
三种主要 Stream Mode
| 模式 | 用途 | 返回内容 | 典型场景 |
|---|---|---|---|
updates |
步骤进度 | 每个 Agent 步骤后的状态变化 | 显示 "正在思考..." "正在调用工具..." |
messages |
Token 流 | LLM 生成的每个 Token + 元数据 | 逐字显示回答,打字机效果 |
custom |
自定义信号 | 通过 stream_writer 发送的任意数据 | 进度条、日志、调试信息 |
📊 Updates 模式 - 步骤进度
stream_mode="updates" 在每个 Agent 步骤完成后发送状态更新,
让你了解 Agent 当前在做什么。
"""
Updates 模式示例
功能:显示 Agent 的执行步骤
"""
from langchain.agents import create_agent
from langchain.tools import tool
@tool
def get_weather(city: str) -> str:
"""获取天气信息"""
return f"{city} 今天晴朗,22°C"
agent = create_agent(
model="gpt-4o",
tools=[get_weather],
system_prompt="你是天气助手。"
)
# 使用 updates 模式流式调用
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "北京今天天气怎么样?"}]},
stream_mode="updates" # 关键参数
):
# chunk 是一个字典,key 是步骤名称
for step_name, step_data in chunk.items():
print(f"\n📍 步骤: {step_name}")
print(f" 数据: {step_data}")
# 预期输出:
# 📍 步骤: agent
# 数据: {'messages': [...]} # Agent 决定调用工具
#
# 📍 步骤: tools
# 数据: {'messages': [...]} # 工具执行结果
#
# 📍 步骤: agent
# 数据: {'messages': [...]} # Agent 生成最终答案
Updates 模式进阶:UI 集成
"""
Updates 模式 UI 集成示例
功能:实现进度条和状态显示
"""
from langchain.agents import create_agent
from langchain.tools import tool
import sys
@tool
def search_database(query: str) -> str:
"""搜索数据库"""
import time
time.sleep(1) # 模拟耗时操作
return f"找到与 '{query}' 相关的 5 条记录"
@tool
def analyze_data(data: str) -> str:
"""分析数据"""
import time
time.sleep(1.5)
return "分析完成:趋势向上"
agent = create_agent(
model="gpt-4o",
tools=[search_database, analyze_data],
system_prompt="你是数据分析助手。"
)
# 状态映射
STATUS_EMOJIS = {
"agent": "🤖 Agent 思考中...",
"tools": "🔧 执行工具中...",
}
print("开始任务...\n")
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "搜索销售数据并分析趋势"}]},
stream_mode="updates"
):
for step, data in chunk.items():
# 显示步骤状态
status = STATUS_EMOJIS.get(step, f"📍 {step}")
print(f"\r{status}", end="", flush=True)
# 如果是 tools 步骤,显示工具名称
if step == "tools":
messages = data.get("messages", [])
for msg in messages:
if hasattr(msg, "name"):
print(f"\n ✅ 执行了工具: {msg.name}")
print("\n\n✅ 任务完成!")
✍️ Messages 模式 - Token 流
stream_mode="messages" 返回 LLM 生成的每个 Token,
实现打字机效果,让用户实时看到 AI 的回答逐字显现。
"""
Messages 模式示例
功能:实现打字机效果
"""
from langchain.agents import create_agent
agent = create_agent(
model="gpt-4o",
tools=[],
system_prompt="你是一个友好的助手,用简洁的语言回答问题。"
)
print("🤖 助手: ", end="", flush=True)
# 使用 messages 模式流式输出
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "介绍一下 LangChain"}]},
stream_mode="messages" # Token 流模式
):
# token 是字符串,metadata 包含节点信息
print(token, end="", flush=True)
print("\n")
# 预期输出:
# 🤖 助手: LangChain 是一个用于构建 AI 应用的框架...
# (逐字显示,像打字机一样)
Messages 模式进阶:元数据处理
"""
Messages 模式元数据示例
功能:区分不同节点的 Token 输出
"""
from langchain.agents import create_agent
from langchain.tools import tool
@tool
def get_fact() -> str:
"""获取有趣的事实"""
return "蜂鸟是唯一能向后飞的鸟类。"
agent = create_agent(
model="gpt-4o",
tools=[get_fact],
)
current_node = None
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "给我一个有趣的事实并解释"}]},
stream_mode="messages"
):
# 获取当前节点名称
node = metadata.get("langgraph_node", "unknown")
# 节点切换时显示标签
if node != current_node:
if current_node is not None:
print() # 换行
print(f"\n[{node.upper()}] ", end="", flush=True)
current_node = node
# 显示 Token
print(token, end="", flush=True)
print("\n")
# 预期输出:
# [AGENT] 让我为你查找一个有趣的事实...
#
# [AGENT] 蜂鸟是唯一能向后飞的鸟类。这是因为它们独特的翅膀结构...
🎨 Custom 模式 - 自定义信号
stream_mode="custom" 允许你在工具或中间件中发送任意自定义信号,
如进度百分比、日志消息、调试信息等。
"""
Custom 模式示例
功能:在工具中发送自定义进度信号
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
import time
@tool
def process_large_file(filename: str) -> str:
"""处理大文件(带进度反馈)"""
# 获取 stream writer
writer = get_stream_writer()
# 模拟处理过程
steps = [
"正在打开文件...",
"正在解析数据...",
"正在验证格式...",
"正在计算统计信息...",
"处理完成!"
]
for i, step in enumerate(steps):
# 发送自定义信号
writer(f"[{i+1}/{len(steps)}] {step}")
time.sleep(0.5) # 模拟耗时
return f"文件 '{filename}' 处理完成,共 1000 条记录"
agent = create_agent(
model="gpt-4o",
tools=[process_large_file],
system_prompt="你是文件处理助手。"
)
# 使用 custom 模式接收自定义信号
for event in agent.stream(
{"messages": [{"role": "user", "content": "处理 sales_data.csv 文件"}]},
stream_mode="custom"
):
print(f"📢 {event}")
# 预期输出:
# 📢 [1/5] 正在打开文件...
# 📢 [2/5] 正在解析数据...
# 📢 [3/5] 正在验证格式...
# 📢 [4/5] 正在计算统计信息...
# 📢 [5/5] 处理完成!
Custom 模式进阶:进度条集成
"""
Custom 模式进度条示例
功能:实现真实的进度条显示
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
import time
import sys
@tool
def download_dataset(dataset_name: str) -> str:
"""下载数据集(带进度条)"""
writer = get_stream_writer()
total_size = 100 # 模拟总大小(MB)
downloaded = 0
while downloaded < total_size:
downloaded += 10
percentage = (downloaded / total_size) * 100
# 发送 JSON 格式的进度数据
writer({
"type": "progress",
"downloaded": downloaded,
"total": total_size,
"percentage": percentage
})
time.sleep(0.2)
writer({"type": "complete", "message": "下载完成!"})
return f"数据集 '{dataset_name}' 下载成功"
agent = create_agent(
model="gpt-4o",
tools=[download_dataset],
)
def draw_progress_bar(percentage):
"""绘制进度条"""
bar_length = 30
filled = int(bar_length * percentage / 100)
bar = "█" * filled + "░" * (bar_length - filled)
return f"[{bar}] {percentage:.0f}%"
# 流式接收并显示进度
for event in agent.stream(
{"messages": [{"role": "user", "content": "下载 ImageNet 数据集"}]},
stream_mode="custom"
):
if isinstance(event, dict):
if event.get("type") == "progress":
progress_bar = draw_progress_bar(event["percentage"])
size_info = f"{event['downloaded']}/{event['total']} MB"
print(f"\r{progress_bar} {size_info}", end="", flush=True)
elif event.get("type") == "complete":
print(f"\n✅ {event['message']}")
print("\n")
🔀 多模式组合流式
可以同时使用多个 stream_mode,通过传递列表来组合不同的流式输出。
"""
多模式组合示例
功能:同时接收步骤更新和自定义信号
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
@tool
def analyze_sentiment(text: str) -> str:
"""情感分析(带进度)"""
writer = get_stream_writer()
writer("开始分析...")
writer("提取特征向量...")
writer("模型推理中...")
writer("生成报告...")
return f"情感分析结果:积极 (置信度 85%)"
agent = create_agent(
model="gpt-4o",
tools=[analyze_sentiment],
)
# 组合 updates 和 custom 模式
for stream_mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "分析这段评论的情感:产品很棒!"}]},
stream_mode=["updates", "custom"] # 列表形式
):
if stream_mode == "updates":
# 处理步骤更新
for step, data in chunk.items():
print(f"[步骤] {step}")
elif stream_mode == "custom":
# 处理自定义信号
print(f"[进度] {chunk}")
# 预期输出:
# [步骤] agent
# [步骤] tools
# [进度] 开始分析...
# [进度] 提取特征向量...
# [进度] 模型推理中...
# [进度] 生成报告...
# [步骤] agent
| 组合 | 适用场景 |
|---|---|
["updates", "custom"] |
需要步骤状态 + 工具进度反馈 |
["messages", "custom"] |
需要打字机效果 + 自定义日志 |
["updates", "messages"] |
需要完整执行过程可视化 |
["updates", "messages", "custom"] |
最全面的监控(调试用) |
🔧 流式工具调用
组合 messages 和 updates 模式,
可以同时访问部分工具调用 JSON 和完整的解析结果。
"""
流式工具调用示例
功能:实时显示工具调用的参数解析过程
"""
from langchain.agents import create_agent
from langchain.tools import tool
@tool
def create_user(
username: str,
email: str,
age: int,
preferences: dict
) -> str:
"""创建用户(复杂参数)"""
return f"用户创建成功:{username} ({email})"
agent = create_agent(
model="gpt-4o",
tools=[create_user],
)
print("实时显示工具调用参数解析...\n")
for stream_mode, chunk in agent.stream(
{"messages": [{"role": "user", "content": "创建用户:张三,邮箱 [email protected],25岁,偏好深色主题"}]},
stream_mode=["messages", "updates"]
):
if stream_mode == "messages":
# Token 流(包含部分 JSON)
token, metadata = chunk
if token:
print(token, end="", flush=True)
elif stream_mode == "updates":
# 获取完整的工具调用
for step, data in chunk.items():
if step == "agent":
messages = data.get("messages", [])
for msg in messages:
if hasattr(msg, "tool_calls") and msg.tool_calls:
print("\n\n✅ 工具调用完整解析:")
for tc in msg.tool_calls:
print(f" 工具: {tc['name']}")
print(f" 参数: {tc['args']}")
print("\n")
🚫 禁用流式输出
在某些场景下(如子 Agent、特定工具),你可能想要禁用流式输出。
"""
禁用流式输出示例
功能:为特定模型禁用 Token 流
"""
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
# 方式 1:在模型初始化时禁用
model_no_streaming = ChatOpenAI(
model="gpt-4o",
streaming=False # 禁用流式输出
)
agent = create_agent(
model=model_no_streaming,
tools=[],
)
# 即使使用 stream_mode="messages",也不会有 Token 流
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "你好"}]},
stream_mode="messages"
):
print(chunk) # 只会返回完整响应,不会逐 Token 返回
# 方式 2:在特定调用时禁用
model = ChatOpenAI(model="gpt-4o")
# 对于某些调用,传递 config 禁用流式
response = model.invoke(
"你好",
config={"metadata": {"streaming": False}}
)
- 子 Agent:内部 Agent 不需要向用户暴露细节
- 批量处理:处理大量请求时,流式输出增加开销
- API 调用:某些模型提供商不支持流式
- 测试环境:单元测试中简化输出验证
🎯 完整示例:智能客服流式响应
"""
智能客服流式响应完整示例
功能:演示实际客服场景中的流式输出应用
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
import time
import sys
# ==================== 工具定义 ====================
@tool
def search_orders(user_id: str) -> str:
"""搜索用户订单(带进度)"""
writer = get_stream_writer()
writer("🔍 正在连接订单数据库...")
time.sleep(0.3)
writer("📊 正在查询订单记录...")
time.sleep(0.5)
writer("✅ 查询完成")
return f"用户 {user_id} 有 3 个订单:\n- 订单#001: iPhone 15 (已发货)\n- 订单#002: AirPods (配送中)\n- 订单#003: MacBook Pro (处理中)"
@tool
def check_inventory(product_id: str) -> str:
"""检查库存(带进度)"""
writer = get_stream_writer()
writer("📦 正在查询库存系统...")
time.sleep(0.4)
writer("🔢 正在计算可用库存...")
time.sleep(0.3)
return f"产品 {product_id} 库存充足:剩余 156 件"
# ==================== 创建 Agent ====================
customer_service_agent = create_agent(
model="gpt-4o",
tools=[search_orders, check_inventory],
system_prompt="""你是专业的客服助手。
职责:
1. 查询用户订单状态
2. 检查产品库存
3. 提供友好、专业的服务
工作流程:
- 使用 search_orders 查询订单
- 使用 check_inventory 检查库存
- 始终保持礼貌和耐心"""
)
# ==================== 流式响应处理 ====================
def streaming_customer_service(user_query: str):
"""流式处理客服对话"""
print("\n" + "="*60)
print(f"用户: {user_query}")
print("="*60 + "\n")
current_section = None
for stream_mode, chunk in customer_service_agent.stream(
{"messages": [{"role": "user", "content": user_query}]},
stream_mode=["updates", "messages", "custom"]
):
if stream_mode == "custom":
# 自定义进度信号
print(f" 💬 {chunk}")
elif stream_mode == "updates":
# 步骤更新
for step, data in chunk.items():
if step != current_section:
if current_section == "agent" and step == "tools":
print() # 换行
current_section = step
elif stream_mode == "messages":
# Token 流(打字机效果)
token, metadata = chunk
node = metadata.get("langgraph_node")
if node == "agent" and token:
if current_section != "agent_response":
print("\n🤖 客服: ", end="", flush=True)
current_section = "agent_response"
print(token, end="", flush=True)
print("\n" + "="*60 + "\n")
# ==================== 测试场景 ====================
if __name__ == "__main__":
# 场景 1:查询订单
streaming_customer_service("我想查看我的订单状态,我的用户ID是 USER_12345")
time.sleep(1)
# 场景 2:检查库存
streaming_customer_service("iPhone 15 Pro Max 还有货吗?产品ID是 PROD_67890")
time.sleep(1)
# 场景 3:组合查询
streaming_customer_service("查询我的订单,然后告诉我 MacBook Pro 的库存情况")
# 预期输出:
# ============================================================
# 用户: 我想查看我的订单状态,我的用户ID是 USER_12345
# ============================================================
#
# 💬 🔍 正在连接订单数据库...
# 💬 📊 正在查询订单记录...
# 💬 ✅查询完成
#
# 🤖 客服: 您好!我已为您查询到订单信息:
# - 订单#001: iPhone 15 (已发货)
# - 订单#002: AirPods (配送中)
# - 订单#003: MacBook Pro (处理中)
#
# 有什么其他可以帮您的吗?
# ============================================================
❓ 常见问题
Q1: 流式输出会增加延迟吗?
不会。流式输出不会增加总执行时间,反而降低了感知延迟。 用户在等待期间能看到进度,体验更好。
Q2: 所有模型都支持流式输出吗?
大多数主流 LLM 提供商支持流式输出,包括:
- ✅ OpenAI (GPT-4, GPT-3.5)
- ✅ Anthropic (Claude)
- ✅ Google (Gemini)
- ❌ 某些本地模型可能不支持
如果模型不支持,使用 stream_mode="messages" 会降级为批量返回。
Q3: 如何在 Web 应用中实现流式输出?
# FastAPI + Server-Sent Events (SSE) 示例
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(message: str):
"""流式聊天端点"""
async def event_generator():
for chunk in agent.stream(
{"messages": [{"role": "user", "content": message}]},
stream_mode="messages"
):
token, metadata = chunk
# 发送 SSE 事件
yield f"data: {token}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Q4: 流式输出可以中断吗?
可以。在 Python 中,使用 break 退出循环即可中断流式输出:
for chunk in agent.stream(...):
if some_condition:
break # 中断流式输出
# 处理 chunk
Q5: 如何记录流式输出用于调试?
# 同时显示和记录
import json
log_file = open("stream_log.jsonl", "w")
for stream_mode, chunk in agent.stream(..., stream_mode=["updates", "custom"]):
# 记录到文件
log_file.write(json.dumps({
"mode": stream_mode,
"chunk": str(chunk)
}) + "\n")
log_file.flush()
# 同时显示
print(chunk)
log_file.close()
✨ 最佳实践
1. 选择合适的 Stream Mode
| 场景 | 推荐模式 | 理由 |
|---|---|---|
| 聊天应用 | messages |
打字机效果,用户体验好 |
| 复杂任务 | updates + custom |
显示步骤和进度 |
| 后台任务 | custom |
仅发送关键进度信号 |
| 调试 | 全部组合 | 完整的执行可视化 |
2. 优化用户体验
- 显示进度:对于耗时操作,始终提供进度反馈
- 明确状态:使用清晰的图标和文字说明当前步骤
- 允许中断:提供"停止"按钮让用户取消长时间任务
- 保存历史:流式输出结束后,保留完整响应供用户查看
3. 性能优化
- 批量发送:避免每个 Token 都触发 UI 更新(缓冲 50-100ms)
- 限制频率:自定义信号不要发送太频繁(如每 100ms 一次进度更新)
- 按需启用:不是所有场景都需要流式,简单任务用
invoke - 资源清理:确保流式连接正确关闭,避免内存泄漏
4. 错误处理
# 流式调用的错误处理
try:
for chunk in agent.stream(...):
process_chunk(chunk)
except Exception as e:
print(f"流式输出错误:{e}")
# 清理资源、通知用户
finally:
# 确保连接关闭
pass