🔗 LangChain Expression Language (LCEL)
声明式组合语言,使用管道操作符构建强大的数据处理链和 AI 工作流。
📚 什么是 LCEL?
LCEL (LangChain Expression Language) 是 LangChain 的声明式组合语言,
允许你使用管道操作符 | 轻松组合各种组件。
| 特性 | 说明 | 示例 |
|---|---|---|
| 简洁语法 | 使用 | 操作符链式组合 | prompt | model | parser |
| 流式优先 | 自动支持流式输出 | 无需额外配置即可流式处理 |
| 并行执行 | RunnableParallel 并发处理 | 同时调用多个模型或工具 |
| 类型安全 | 编译时类型检查 | IDE 自动补全和错误提示 |
| 可追踪性 | LangSmith 自动追踪 | 无需手动插桩 |
"""
LCEL 基础示例 - 简单的链式调用
"""
from langchain_core.prompts import ChatPromptTemplate
from langchain import init_chat_model
# 创建提示词模板
prompt = ChatPromptTemplate.from_template("翻译成英文:{text}")
# 创建模型
model = init_chat_model("gpt-4o", model_provider="openai")
# 使用管道操作符组合
chain = prompt | model
# 调用链
result = chain.invoke({"text": "你好世界"})
print(result.content)
# 输出:Hello World
🆚 LCEL vs 传统链
LCEL 在 LangChain 1.0 中替代了传统的 LLMChain, 提供更简洁、更强大的组合方式:
| 对比项 | 传统链(0.x) | LCEL(1.0) |
|---|---|---|
| 语法 | 显式类实例化 | 管道操作符 | |
| 流式输出 | 需要手动配置 | 自动支持 |
| 并行执行 | 不支持 | RunnableParallel |
| 代码量 | 较多(5-10 行) | 极简(1-3 行) |
| 调试追踪 | 手动配置回调 | 自动 LangSmith 集成 |
"""
对比:传统链 vs LCEL
"""
from langchain_core.prompts import ChatPromptTemplate
from langchain import init_chat_model
model = init_chat_model("gpt-4o", model_provider="openai")
prompt = ChatPromptTemplate.from_template("总结:{text}")
# ❌ 传统方式(0.x,已废弃)
from langchain_classic.chains import LLMChain
chain = LLMChain(llm=model, prompt=prompt)
result = chain.run(text="LangChain 是一个...") # 复杂且不灵活
# ✅ LCEL 方式(1.0,推荐)
chain = prompt | model
result = chain.invoke({"text": "LangChain 是一个..."}) # 简洁且强大
🔀 管道操作符 | 详解
管道操作符 | 是 LCEL 的核心,它将前一个组件的输出
传递给下一个组件的输入。
dict: text"] --> Prompt["ChatPromptTemplate
prompt"] Prompt --> Model["ChatModel
model"] Model --> Parser["OutputParser
parser"] Parser --> Output["输出
解析后的结果"] Input -.->|invoke| Prompt Prompt -.->|messages| Model Model -.->|AIMessage| Parser Parser -.->|parsed data| Output style Input fill:#e0e0e0,color:#000 style Prompt fill:#3b82f6,color:#fff style Model fill:#10b981,color:#fff style Parser fill:#f59e0b,color:#fff style Output fill:#8b5cf6,color:#fff
"""
管道操作符 - 多步骤链
"""
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain import init_chat_model
# 步骤 1:提示词
prompt = ChatPromptTemplate.from_template(
"将以下文本翻译成{language}:\n\n{text}"
)
# 步骤 2:模型
model = init_chat_model("gpt-4o", model_provider="openai")
# 步骤 3:输出解析器
parser = StrOutputParser()
# 组合成链(3 个步骤)
translation_chain = prompt | model | parser
# 执行链
result = translation_chain.invoke({
"language": "英文",
"text": "今天天气不错"
})
print(result)
# 输出:The weather is nice today
管道操作符背后的原理是 Runnable 协议。
所有实现了 Runnable 接口的组件都可以使用 | 组合。
ChatPromptTemplate→ RunnableChatModel→ RunnableOutputParser→ RunnableRunnableLambda→ 自定义 Runnable
🔧 RunnableLambda - 自定义逻辑
RunnableLambda 允许你在链中插入自定义 Python 函数,
用于数据转换、条件逻辑或副作用操作。
"""
RunnableLambda - 在链中插入自定义函数
"""
from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain import init_chat_model
model = init_chat_model("gpt-4o", model_provider="openai")
# 自定义函数:将模型输出转换为大写
def to_uppercase(ai_message):
"""将 AIMessage 的 content 转换为大写"""
return ai_message.content.upper()
# 自定义函数:添加前缀
def add_prefix(text):
"""添加前缀"""
return f"📌 处理结果:{text}"
# 构建链
prompt = ChatPromptTemplate.from_template("用一句话介绍:{topic}")
chain = (
prompt
| model
| RunnableLambda(to_uppercase) # 转大写
| RunnableLambda(add_prefix) # 添加前缀
)
result = chain.invoke({"topic": "Python"})
print(result)
# 输出:📌 处理结果:PYTHON IS A HIGH-LEVEL PROGRAMMING LANGUAGE...
RunnableLambda 常见用途
- 数据转换:格式化、清洗、提取字段
- 日志记录:打印中间结果用于调试
- 条件分支:根据输入选择不同的处理路径
- 副作用:保存到数据库、发送通知
🔄 RunnablePassthrough - 数据传递
RunnablePassthrough 用于在链中透传数据,
常用于需要保留原始输入或合并多个数据源的场景。
"""
RunnablePassthrough - 保留原始输入
"""
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain import init_chat_model
model = init_chat_model("gpt-4o", model_provider="openai")
# 场景:我们想在最终结果中同时包含原始问题和 AI 回答
prompt = ChatPromptTemplate.from_template("回答问题:{question}")
chain = (
# 使用字典保留原始输入
{
"question": RunnablePassthrough(), # 透传原始问题
"answer": prompt | model # 生成答案
}
)
result = chain.invoke("什么是量子计算?")
print(f"问题:{result['question']}")
print(f"答案:{result['answer'].content}")
# 输出:
# 问题:什么是量子计算?
# 答案:量子计算是利用量子力学原理...
最常见的用法是在 RAG 场景中,同时保留用户问题和检索到的文档:
chain = {
"context": retriever, # 检索文档
"question": RunnablePassthrough() # 保留原始问题
} | prompt | model
⚡ RunnableParallel - 并行执行
RunnableParallel 允许并发执行多个 Runnable,
显著提升性能,特别适用于:
- 同时调用多个 LLM(对比不同模型的输出)
- 并行检索多个数据源
- 同时执行多个独立的数据转换
翻译"] Parallel --> Task2["任务 2
总结"] Parallel --> Task3["任务 3
关键词提取"] Task1 --> Output1["输出 1"] Task2 --> Output2["输出 2"] Task3 --> Output3["输出 3"] Output1 --> Merge["合并结果"] Output2 --> Merge Output3 --> Merge Merge --> Final["最终输出
包含所有结果"] style Parallel fill:#f59e0b,color:#fff style Merge fill:#8b5cf6,color:#fff
"""
RunnableParallel - 并发执行多个任务
"""
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain import init_chat_model
model = init_chat_model("gpt-4o", model_provider="openai")
# 定义 3 个独立的链
translate_chain = (
ChatPromptTemplate.from_template("翻译成英文:{text}")
| model
)
summarize_chain = (
ChatPromptTemplate.from_template("用一句话总结:{text}")
| model
)
keywords_chain = (
ChatPromptTemplate.from_template("提取 3 个关键词:{text}")
| model
)
# 使用 RunnableParallel 并行执行
parallel_chain = RunnableParallel(
translation=translate_chain,
summary=summarize_chain,
keywords=keywords_chain
)
# 执行(3 个任务并发)
result = parallel_chain.invoke({
"text": "人工智能正在改变世界,从医疗到教育,AI 的应用无处不在。"
})
print("翻译:", result["translation"].content)
print("总结:", result["summary"].content)
print("关键词:", result["keywords"].content)
# 输出示例:
# 翻译:Artificial intelligence is changing the world...
# 总结:AI 广泛应用于医疗和教育等领域
# 关键词:人工智能、医疗、教育
- 成本增加:并行调用多个 LLM 会同时消耗 Token
- 速率限制:注意 API 的并发限制(每分钟请求数)
- 错误处理:任何一个任务失败都会导致整体失败
🏗️ 复杂 LCEL 链实战
将所有技术组合起来,构建一个完整的 RAG 问答链:
"""
复杂 LCEL 链 - 完整的 RAG 问答系统
"""
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain import init_chat_model
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
# 1. 初始化组件
model = init_chat_model("gpt-4o", model_provider="openai")
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 2. 创建向量存储(假设已有数据)
vectorstore = Chroma(
collection_name="knowledge_base",
embedding_function=embeddings
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 3. 定义提示词模板
rag_prompt = ChatPromptTemplate.from_template("""
基于以下上下文回答问题。如果上下文中没有相关信息,请说"我不知道"。
上下文:
{context}
问题:{question}
答案:
""")
# 4. 自定义函数:格式化检索到的文档
def format_docs(docs):
"""将 Document 列表格式化为字符串"""
return "\n\n".join([doc.page_content for doc in docs])
# 5. 构建完整的 LCEL 链
rag_chain = (
# 步骤 1:并行准备数据
RunnableParallel(
context=retriever | format_docs, # 检索并格式化文档
question=RunnablePassthrough() # 透传原始问题
)
# 步骤 2:生成提示词
| rag_prompt
# 步骤 3:调用模型
| model
# 步骤 4:解析输出
| StrOutputParser()
)
# 6. 使用链
question = "什么是 LangChain?"
answer = rag_chain.invoke(question)
print(f"问题:{question}")
print(f"答案:{answer}")
# 7. 流式输出(LCEL 自动支持)
print("\n流式输出:")
for chunk in rag_chain.stream(question):
print(chunk, end="", flush=True)
链的执行流程
retriever"] Parallel --> R2["透传问题
RunnablePassthrough"] R1 --> Format["格式化文档
format_docs"] Format --> Context["context"] R2 --> Question["question"] Context --> Prompt["RAG Prompt
rag_prompt"] Question --> Prompt Prompt --> Model["GPT-4o
model"] Model --> Parser["StrOutputParser
parser"] Parser --> Answer["答案"] style Parallel fill:#f59e0b,color:#fff style Model fill:#10b981,color:#fff style Answer fill:#8b5cf6,color:#fff
✨ LCEL 最佳实践
1. 优先使用 LCEL 而非传统链
在 LangChain 1.0 中,始终使用 LCEL 来组合组件,
避免使用已废弃的 LLMChain。
2. 善用并行执行
当多个任务之间没有依赖关系时,使用 RunnableParallel 并行执行:
# ✅ 好的做法:并行执行独立任务
parallel = RunnableParallel(
task1=chain1,
task2=chain2,
task3=chain3
)
# ❌ 不好的做法:串行执行(浪费时间)
result1 = chain1.invoke(input)
result2 = chain2.invoke(input)
result3 = chain3.invoke(input)
3. 使用 RunnableLambda 添加调试日志
def debug_log(x):
"""打印中间结果用于调试"""
print(f"🔍 Debug: {x}")
return x
# 在链中插入调试日志
chain = (
prompt
| RunnableLambda(debug_log) # 查看提示词
| model
| RunnableLambda(debug_log) # 查看模型输出
| parser
)
4. 保持链的可读性
对于复杂的链,使用变量和注释提升可读性:
# ✅ 好的做法:拆分为多个步骤
retrieval_step = retriever | format_docs
prompt_step = rag_prompt
generation_step = model | parser
full_chain = (
{"context": retrieval_step, "question": RunnablePassthrough()}
| prompt_step
| generation_step
)
# ❌ 不好的做法:一行写完(难以理解和维护)
chain = ({"context": retriever | (lambda docs: "\n".join([d.page_content for d in docs])), "question": RunnablePassthrough()} | rag_prompt | model | StrOutputParser())
❓ 常见问题
Q1: LCEL 和 Agent 有什么关系?
create_agent() 内部使用 LangGraph(不是 LCEL)来编排工具调用。
但你可以在 Agent 的工具内部使用 LCEL 链。
Q2: 如何在 LCEL 链中处理错误?
def safe_invoke(chain, input_data):
"""带错误处理的链调用"""
try:
return chain.invoke(input_data)
except Exception as e:
print(f"链执行失败:{e}")
return None
result = safe_invoke(chain, {"text": "..."})
Q3: LCEL 链可以保存和加载吗?
不推荐。LCEL 链是代码,建议通过版本控制(Git)管理, 而不是序列化保存。
Q4: RunnableParallel 会等待所有任务完成吗?
是的。RunnableParallel 会等待所有子任务完成后才返回结果。
如果任何一个任务失败,整个并行执行会失败。
Q5: 如何在 LCEL 中使用条件分支?
使用 RunnableLambda 结合条件逻辑:
def route_based_on_length(text):
"""根据文本长度选择不同的处理路径"""
if len(text) > 100:
return long_text_chain.invoke(text)
else:
return short_text_chain.invoke(text)
chain = RunnableLambda(route_based_on_length)