📚 什么是 Streaming?

Streaming(流式输出)允许 Agent 在执行过程中实时发送更新, 而不是等待整个任务完成后才返回结果。这显著改善了用户体验,特别是处理耗时任务时。

💡 核心优势
  • 实时反馈:用户立即看到 Agent 的思考过程和进度
  • 降低感知延迟:即使总时间相同,流式输出让用户感觉更快
  • 早期错误发现:可以在执行中途发现问题并中断
  • 更好的交互性:支持进度条、日志显示等 UI 元素

对比:阻塞式 vs 流式

特性 阻塞式调用 (invoke) 流式调用 (stream)
返回时机 任务完成后 逐步返回
用户体验 等待期间无反馈 实时看到进度
适用场景 简单、快速任务 复杂、耗时任务
实现难度 简单 稍复杂

🏗️ Streaming 架构

graph TB A[用户请求] --> B[agent.stream 调用] B --> C{选择 stream_mode} C -->|updates| D[步骤更新流] C -->|messages| E[Token 流] C -->|custom| F[自定义信号流] C -->|多模式| G[组合流] D --> H[每步完成后发送状态] E --> I[LLM 逐 Token 发送] F --> J[工具自定义消息] G --> K[同时发送多种事件] H --> L[客户端接收] I --> L J --> L K --> L L --> M[实时显示] style D fill:#3b82f6,color:#fff style E fill:#10b981,color:#fff style F fill:#f59e0b,color:#fff style G fill:#8b5cf6,color:#fff

三种主要 Stream Mode

模式 用途 返回内容 典型场景
updates 步骤进度 每个 Agent 步骤后的状态变化 显示 "正在思考..." "正在调用工具..."
messages Token 流 LLM 生成的每个 Token + 元数据 逐字显示回答,打字机效果
custom 自定义信号 通过 stream_writer 发送的任意数据 进度条、日志、调试信息

📊 Updates 模式 - 步骤进度

stream_mode="updates" 在每个 Agent 步骤完成后发送状态更新, 让你了解 Agent 当前在做什么。

Python 🟢 基础
"""
Updates 模式示例
功能:显示 Agent 的执行步骤
"""
from langchain.agents import create_agent
from langchain.tools import tool

@tool
def get_weather(city: str) -> str:
    """获取天气信息"""
    return f"{city} 今天晴朗,22°C"

agent = create_agent(
    model="gpt-4o",
    tools=[get_weather],
    system_prompt="你是天气助手。"
)

# 使用 updates 模式流式调用
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "北京今天天气怎么样?"}]},
    stream_mode="updates"  # 关键参数
):
    # chunk 是一个字典,key 是步骤名称
    for step_name, step_data in chunk.items():
        print(f"\n📍 步骤: {step_name}")
        print(f"   数据: {step_data}")

# 预期输出:
# 📍 步骤: agent
#    数据: {'messages': [...]}  # Agent 决定调用工具
#
# 📍 步骤: tools
#    数据: {'messages': [...]}  # 工具执行结果
#
# 📍 步骤: agent
#    数据: {'messages': [...]}  # Agent 生成最终答案

Updates 模式进阶:UI 集成

Python 🟡 中级
"""
Updates 模式 UI 集成示例
功能:实现进度条和状态显示
"""
from langchain.agents import create_agent
from langchain.tools import tool
import sys

@tool
def search_database(query: str) -> str:
    """搜索数据库"""
    import time
    time.sleep(1)  # 模拟耗时操作
    return f"找到与 '{query}' 相关的 5 条记录"

@tool
def analyze_data(data: str) -> str:
    """分析数据"""
    import time
    time.sleep(1.5)
    return "分析完成:趋势向上"

agent = create_agent(
    model="gpt-4o",
    tools=[search_database, analyze_data],
    system_prompt="你是数据分析助手。"
)

# 状态映射
STATUS_EMOJIS = {
    "agent": "🤖 Agent 思考中...",
    "tools": "🔧 执行工具中...",
}

print("开始任务...\n")

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "搜索销售数据并分析趋势"}]},
    stream_mode="updates"
):
    for step, data in chunk.items():
        # 显示步骤状态
        status = STATUS_EMOJIS.get(step, f"📍 {step}")
        print(f"\r{status}", end="", flush=True)

        # 如果是 tools 步骤,显示工具名称
        if step == "tools":
            messages = data.get("messages", [])
            for msg in messages:
                if hasattr(msg, "name"):
                    print(f"\n   ✅ 执行了工具: {msg.name}")

print("\n\n✅ 任务完成!")

✍️ Messages 模式 - Token 流

stream_mode="messages" 返回 LLM 生成的每个 Token, 实现打字机效果,让用户实时看到 AI 的回答逐字显现。

Python 🟢 基础
"""
Messages 模式示例
功能:实现打字机效果
"""
from langchain.agents import create_agent

agent = create_agent(
    model="gpt-4o",
    tools=[],
    system_prompt="你是一个友好的助手,用简洁的语言回答问题。"
)

print("🤖 助手: ", end="", flush=True)

# 使用 messages 模式流式输出
for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "介绍一下 LangChain"}]},
    stream_mode="messages"  # Token 流模式
):
    # token 是字符串,metadata 包含节点信息
    print(token, end="", flush=True)

print("\n")

# 预期输出:
# 🤖 助手: LangChain 是一个用于构建 AI 应用的框架...
#          (逐字显示,像打字机一样)

Messages 模式进阶:元数据处理

Python 🟡 中级
"""
Messages 模式元数据示例
功能:区分不同节点的 Token 输出
"""
from langchain.agents import create_agent
from langchain.tools import tool

@tool
def get_fact() -> str:
    """获取有趣的事实"""
    return "蜂鸟是唯一能向后飞的鸟类。"

agent = create_agent(
    model="gpt-4o",
    tools=[get_fact],
)

current_node = None

for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "给我一个有趣的事实并解释"}]},
    stream_mode="messages"
):
    # 获取当前节点名称
    node = metadata.get("langgraph_node", "unknown")

    # 节点切换时显示标签
    if node != current_node:
        if current_node is not None:
            print()  # 换行
        print(f"\n[{node.upper()}] ", end="", flush=True)
        current_node = node

    # 显示 Token
    print(token, end="", flush=True)

print("\n")

# 预期输出:
# [AGENT] 让我为你查找一个有趣的事实...
#
# [AGENT] 蜂鸟是唯一能向后飞的鸟类。这是因为它们独特的翅膀结构...

🎨 Custom 模式 - 自定义信号

stream_mode="custom" 允许你在工具或中间件中发送任意自定义信号, 如进度百分比、日志消息、调试信息等。

Python 🟡 中级
"""
Custom 模式示例
功能:在工具中发送自定义进度信号
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
import time

@tool
def process_large_file(filename: str) -> str:
    """处理大文件(带进度反馈)"""
    # 获取 stream writer
    writer = get_stream_writer()

    # 模拟处理过程
    steps = [
        "正在打开文件...",
        "正在解析数据...",
        "正在验证格式...",
        "正在计算统计信息...",
        "处理完成!"
    ]

    for i, step in enumerate(steps):
        # 发送自定义信号
        writer(f"[{i+1}/{len(steps)}] {step}")
        time.sleep(0.5)  # 模拟耗时

    return f"文件 '{filename}' 处理完成,共 1000 条记录"

agent = create_agent(
    model="gpt-4o",
    tools=[process_large_file],
    system_prompt="你是文件处理助手。"
)

# 使用 custom 模式接收自定义信号
for event in agent.stream(
    {"messages": [{"role": "user", "content": "处理 sales_data.csv 文件"}]},
    stream_mode="custom"
):
    print(f"📢 {event}")

# 预期输出:
# 📢 [1/5] 正在打开文件...
# 📢 [2/5] 正在解析数据...
# 📢 [3/5] 正在验证格式...
# 📢 [4/5] 正在计算统计信息...
# 📢 [5/5] 处理完成!

Custom 模式进阶:进度条集成

Python 🔴 高级
"""
Custom 模式进度条示例
功能:实现真实的进度条显示
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
import time
import sys

@tool
def download_dataset(dataset_name: str) -> str:
    """下载数据集(带进度条)"""
    writer = get_stream_writer()

    total_size = 100  # 模拟总大小(MB)
    downloaded = 0

    while downloaded < total_size:
        downloaded += 10
        percentage = (downloaded / total_size) * 100

        # 发送 JSON 格式的进度数据
        writer({
            "type": "progress",
            "downloaded": downloaded,
            "total": total_size,
            "percentage": percentage
        })

        time.sleep(0.2)

    writer({"type": "complete", "message": "下载完成!"})
    return f"数据集 '{dataset_name}' 下载成功"

agent = create_agent(
    model="gpt-4o",
    tools=[download_dataset],
)

def draw_progress_bar(percentage):
    """绘制进度条"""
    bar_length = 30
    filled = int(bar_length * percentage / 100)
    bar = "█" * filled + "░" * (bar_length - filled)
    return f"[{bar}] {percentage:.0f}%"

# 流式接收并显示进度
for event in agent.stream(
    {"messages": [{"role": "user", "content": "下载 ImageNet 数据集"}]},
    stream_mode="custom"
):
    if isinstance(event, dict):
        if event.get("type") == "progress":
            progress_bar = draw_progress_bar(event["percentage"])
            size_info = f"{event['downloaded']}/{event['total']} MB"
            print(f"\r{progress_bar} {size_info}", end="", flush=True)
        elif event.get("type") == "complete":
            print(f"\n✅ {event['message']}")

print("\n")

🔀 多模式组合流式

可以同时使用多个 stream_mode,通过传递列表来组合不同的流式输出。

Python 🔴 高级
"""
多模式组合示例
功能:同时接收步骤更新和自定义信号
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer

@tool
def analyze_sentiment(text: str) -> str:
    """情感分析(带进度)"""
    writer = get_stream_writer()

    writer("开始分析...")
    writer("提取特征向量...")
    writer("模型推理中...")
    writer("生成报告...")

    return f"情感分析结果:积极 (置信度 85%)"

agent = create_agent(
    model="gpt-4o",
    tools=[analyze_sentiment],
)

# 组合 updates 和 custom 模式
for stream_mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "分析这段评论的情感:产品很棒!"}]},
    stream_mode=["updates", "custom"]  # 列表形式
):
    if stream_mode == "updates":
        # 处理步骤更新
        for step, data in chunk.items():
            print(f"[步骤] {step}")

    elif stream_mode == "custom":
        # 处理自定义信号
        print(f"[进度] {chunk}")

# 预期输出:
# [步骤] agent
# [步骤] tools
# [进度] 开始分析...
# [进度] 提取特征向量...
# [进度] 模型推理中...
# [进度] 生成报告...
# [步骤] agent
✅ 多模式组合建议
组合 适用场景
["updates", "custom"] 需要步骤状态 + 工具进度反馈
["messages", "custom"] 需要打字机效果 + 自定义日志
["updates", "messages"] 需要完整执行过程可视化
["updates", "messages", "custom"] 最全面的监控(调试用)

🔧 流式工具调用

组合 messagesupdates 模式, 可以同时访问部分工具调用 JSON 和完整的解析结果。

Python 🔴 高级
"""
流式工具调用示例
功能:实时显示工具调用的参数解析过程
"""
from langchain.agents import create_agent
from langchain.tools import tool

@tool
def create_user(
    username: str,
    email: str,
    age: int,
    preferences: dict
) -> str:
    """创建用户(复杂参数)"""
    return f"用户创建成功:{username} ({email})"

agent = create_agent(
    model="gpt-4o",
    tools=[create_user],
)

print("实时显示工具调用参数解析...\n")

for stream_mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "创建用户:张三,邮箱 [email protected],25岁,偏好深色主题"}]},
    stream_mode=["messages", "updates"]
):
    if stream_mode == "messages":
        # Token 流(包含部分 JSON)
        token, metadata = chunk
        if token:
            print(token, end="", flush=True)

    elif stream_mode == "updates":
        # 获取完整的工具调用
        for step, data in chunk.items():
            if step == "agent":
                messages = data.get("messages", [])
                for msg in messages:
                    if hasattr(msg, "tool_calls") and msg.tool_calls:
                        print("\n\n✅ 工具调用完整解析:")
                        for tc in msg.tool_calls:
                            print(f"   工具: {tc['name']}")
                            print(f"   参数: {tc['args']}")

print("\n")

🚫 禁用流式输出

在某些场景下(如子 Agent、特定工具),你可能想要禁用流式输出。

Python 🟡 中级
"""
禁用流式输出示例
功能:为特定模型禁用 Token 流
"""
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent

# 方式 1:在模型初始化时禁用
model_no_streaming = ChatOpenAI(
    model="gpt-4o",
    streaming=False  # 禁用流式输出
)

agent = create_agent(
    model=model_no_streaming,
    tools=[],
)

# 即使使用 stream_mode="messages",也不会有 Token 流
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "你好"}]},
    stream_mode="messages"
):
    print(chunk)  # 只会返回完整响应,不会逐 Token 返回

# 方式 2:在特定调用时禁用
model = ChatOpenAI(model="gpt-4o")

# 对于某些调用,传递 config 禁用流式
response = model.invoke(
    "你好",
    config={"metadata": {"streaming": False}}
)
💡 何时禁用流式输出
  • 子 Agent:内部 Agent 不需要向用户暴露细节
  • 批量处理:处理大量请求时,流式输出增加开销
  • API 调用:某些模型提供商不支持流式
  • 测试环境:单元测试中简化输出验证

🎯 完整示例:智能客服流式响应

Python 🔴 高级 - 完整项目
"""
智能客服流式响应完整示例
功能:演示实际客服场景中的流式输出应用
"""
from langchain.agents import create_agent
from langchain.tools import tool
from langgraph.config import get_stream_writer
import time
import sys

# ==================== 工具定义 ====================

@tool
def search_orders(user_id: str) -> str:
    """搜索用户订单(带进度)"""
    writer = get_stream_writer()

    writer("🔍 正在连接订单数据库...")
    time.sleep(0.3)

    writer("📊 正在查询订单记录...")
    time.sleep(0.5)

    writer("✅ 查询完成")

    return f"用户 {user_id} 有 3 个订单:\n- 订单#001: iPhone 15 (已发货)\n- 订单#002: AirPods (配送中)\n- 订单#003: MacBook Pro (处理中)"

@tool
def check_inventory(product_id: str) -> str:
    """检查库存(带进度)"""
    writer = get_stream_writer()

    writer("📦 正在查询库存系统...")
    time.sleep(0.4)

    writer("🔢 正在计算可用库存...")
    time.sleep(0.3)

    return f"产品 {product_id} 库存充足:剩余 156 件"

# ==================== 创建 Agent ====================

customer_service_agent = create_agent(
    model="gpt-4o",
    tools=[search_orders, check_inventory],
    system_prompt="""你是专业的客服助手。

职责:
1. 查询用户订单状态
2. 检查产品库存
3. 提供友好、专业的服务

工作流程:
- 使用 search_orders 查询订单
- 使用 check_inventory 检查库存
- 始终保持礼貌和耐心"""
)

# ==================== 流式响应处理 ====================

def streaming_customer_service(user_query: str):
    """流式处理客服对话"""
    print("\n" + "="*60)
    print(f"用户: {user_query}")
    print("="*60 + "\n")

    current_section = None

    for stream_mode, chunk in customer_service_agent.stream(
        {"messages": [{"role": "user", "content": user_query}]},
        stream_mode=["updates", "messages", "custom"]
    ):
        if stream_mode == "custom":
            # 自定义进度信号
            print(f"  💬 {chunk}")

        elif stream_mode == "updates":
            # 步骤更新
            for step, data in chunk.items():
                if step != current_section:
                    if current_section == "agent" and step == "tools":
                        print()  # 换行
                    current_section = step

        elif stream_mode == "messages":
            # Token 流(打字机效果)
            token, metadata = chunk
            node = metadata.get("langgraph_node")

            if node == "agent" and token:
                if current_section != "agent_response":
                    print("\n🤖 客服: ", end="", flush=True)
                    current_section = "agent_response"
                print(token, end="", flush=True)

    print("\n" + "="*60 + "\n")

# ==================== 测试场景 ====================

if __name__ == "__main__":
    # 场景 1:查询订单
    streaming_customer_service("我想查看我的订单状态,我的用户ID是 USER_12345")

    time.sleep(1)

    # 场景 2:检查库存
    streaming_customer_service("iPhone 15 Pro Max 还有货吗?产品ID是 PROD_67890")

    time.sleep(1)

    # 场景 3:组合查询
    streaming_customer_service("查询我的订单,然后告诉我 MacBook Pro 的库存情况")

# 预期输出:
# ============================================================
# 用户: 我想查看我的订单状态,我的用户ID是 USER_12345
# ============================================================
#
#   💬 🔍 正在连接订单数据库...
#   💬 📊 正在查询订单记录...
#   💬 ✅查询完成
#
# 🤖 客服: 您好!我已为您查询到订单信息:
# - 订单#001: iPhone 15 (已发货)
# - 订单#002: AirPods (配送中)
# - 订单#003: MacBook Pro (处理中)
#
# 有什么其他可以帮您的吗?
# ============================================================

❓ 常见问题

Q1: 流式输出会增加延迟吗?

不会。流式输出不会增加总执行时间,反而降低了感知延迟。 用户在等待期间能看到进度,体验更好。

Q2: 所有模型都支持流式输出吗?

大多数主流 LLM 提供商支持流式输出,包括:

  • ✅ OpenAI (GPT-4, GPT-3.5)
  • ✅ Anthropic (Claude)
  • ✅ Google (Gemini)
  • ❌ 某些本地模型可能不支持

如果模型不支持,使用 stream_mode="messages" 会降级为批量返回。

Q3: 如何在 Web 应用中实现流式输出?

Python
# FastAPI + Server-Sent Events (SSE) 示例
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(message: str):
    """流式聊天端点"""
    async def event_generator():
        for chunk in agent.stream(
            {"messages": [{"role": "user", "content": message}]},
            stream_mode="messages"
        ):
            token, metadata = chunk
            # 发送 SSE 事件
            yield f"data: {token}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Q4: 流式输出可以中断吗?

可以。在 Python 中,使用 break 退出循环即可中断流式输出:

Python
for chunk in agent.stream(...):
    if some_condition:
        break  # 中断流式输出
    # 处理 chunk

Q5: 如何记录流式输出用于调试?

Python
# 同时显示和记录
import json

log_file = open("stream_log.jsonl", "w")

for stream_mode, chunk in agent.stream(..., stream_mode=["updates", "custom"]):
    # 记录到文件
    log_file.write(json.dumps({
        "mode": stream_mode,
        "chunk": str(chunk)
    }) + "\n")
    log_file.flush()

    # 同时显示
    print(chunk)

log_file.close()

✨ 最佳实践

1. 选择合适的 Stream Mode

场景 推荐模式 理由
聊天应用 messages 打字机效果,用户体验好
复杂任务 updates + custom 显示步骤和进度
后台任务 custom 仅发送关键进度信号
调试 全部组合 完整的执行可视化

2. 优化用户体验

  • 显示进度:对于耗时操作,始终提供进度反馈
  • 明确状态:使用清晰的图标和文字说明当前步骤
  • 允许中断:提供"停止"按钮让用户取消长时间任务
  • 保存历史:流式输出结束后,保留完整响应供用户查看

3. 性能优化

  • 批量发送:避免每个 Token 都触发 UI 更新(缓冲 50-100ms)
  • 限制频率:自定义信号不要发送太频繁(如每 100ms 一次进度更新)
  • 按需启用:不是所有场景都需要流式,简单任务用 invoke
  • 资源清理:确保流式连接正确关闭,避免内存泄漏

4. 错误处理

Python
# 流式调用的错误处理
try:
    for chunk in agent.stream(...):
        process_chunk(chunk)
except Exception as e:
    print(f"流式输出错误:{e}")
    # 清理资源、通知用户
finally:
    # 确保连接关闭
    pass

📖 参考资源