链式调用 API
LCEL 操作符与链组合
概述
LangChain 表达式语言(LCEL)提供了一套声明式的方法来组合链。所有组件都实现了 Runnable 接口,可以使用统一的操作符进行组合。
graph LR
A[Runnable] --> B["| 操作符"]
B --> C[链组合]
D[RunnableLambda] --> A
E[RunnableParallel] --> A
F[RunnableBranch] --> A
G[RunnablePassthrough] --> A
style A fill:#e1f5fe
style B fill:#c8e6c9
LCEL 操作符
| (管道操作符)
串行连接两个 Runnable。
def __or__(
self,
other: Runnable[Any, Any],
) -> Runnable[InputType, OtherOutputType]:
"""
管道连接
expression: runnable1 | runnable2
等价于: runnable1.pipe(runnable2)
Args:
other: 下一个 Runnable
Returns:
组合后的 Runnable
"""
使用示例
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
llm = ChatOpenAI(model="gpt-4o")
parser = StrOutputParser()
# 使用 | 操作符组合
chain = prompt | llm | parser
# 等价于:
# chain = prompt.pipe(llm).pipe(parser)
result = chain.invoke({"topic": "Python"})
组合函数
RunnableLambda
将函数转换为 Runnable。
class RunnableLambda(Serializable, Runnable[Input, Output]):
"""函数包装器"""
func: Callable[[Input], Output]
"""要包装的函数"""
afunc: Optional[Callable[[Input], Awaitable[Output]]] = None
"""异步版本函数"""
构造方法
python
from langchain_core.runnables import RunnableLambda
# 包装普通函数
def uppercase(text: str) -> str:
return text.upper()
uppercase_runnable = RunnableLambda(uppercase)
# 使用 lambda
runnable = RunnableLambda(lambda x: x["text"].upper())
# 异步函数
import asyncio
async def auppercase(text: str) -> str:
await asyncio.sleep(0.1)
return text.upper()
async_runnable = RunnableLambda(func=uppercase, afunc=aucppercase)
RunnableParallel
并行执行多个 Runnable,合并结果。
class RunnableParallel(
Runnable[Input, Dict[str, Any]],
Serializable,
):
"""并行执行"""
steps: Dict[str, Runnable[Any, Any]]
"""要并行执行的 Runnable 映射"""
# 也可以使用别名: RunnableParallel = RunnableParallel
使用方式
python
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
# 方式1: 使用字典
chain = RunnableParallel({
"joke": ChatOpenAI().pipe(lambda x: x.content),
"poem": ChatOpenAI().pipe(lambda x: x.content),
})
result = chain.invoke({"input": "Tell me a joke and a poem"})
# 方式2: 使用 ** 解包
joke_chain = ChatOpenAI() | (lambda x: x.content)
poem_chain = ChatOpenAI() | (lambda x: x.content)
chain = RunnableParallel(
joke=joke_chain,
poem=poem_chain
)
# 方式3: 分配输入
parallel = RunnableParallel(
input=lambda x: x["input"],
output=lambda x: x["output"]
)
RunnableBranch
条件分支,根据输入路由到不同的 Runnable。
class RunnableBranch(
Runnable[Input, Output],
Serializable,
):
"""条件分支"""
branches: List[
Tuple[
Callable[[Input], bool], # 条件函数
Runnable[Input, Output] # 对应的 Runnable
]
]
"""(条件, Runnable) 列表"""
使用示例
python
from langchain_core.runnables import RunnableBranch
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
# 定义不同的处理链
joke_chain = ChatPromptTemplate.from_template("Tell me a joke about {topic}") | llm
poem_chain = ChatPromptTemplate.from_template("Write a poem about {topic}") | llm
fact_chain = ChatPromptTemplate.from_template("Tell me a fact about {topic}") | llm
# 创建分支
branch = RunnableBranch(
(lambda x: x["style"] == "funny", joke_chain),
(lambda x: x["style"] == "creative", poem_chain),
fact_chain # 默认分支
)
result = branch.invoke({"style": "funny", "topic": "Python"})
RunnablePassthrough
原样传递输入。
class RunnablePassthrough(Runnable[Input, Input]):
"""原样传递"""
@staticmethod
def assign(**kwargs: Union[Runnable[Any, Any], Any]) -> Runnable:
"""
添加额外字段
Args:
**kwargs: 要添加的字段或 Runnable
Returns:
添加字段后的 Runnable
"""
使用示例
python
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
# 原样传递
passthrough = RunnablePassthrough()
result = passthrough.invoke({"a": 1, "b": 2})
# {"a": 1, "b": 2}
# 添加字段
chain = RunnablePassthrough.assign(
output=lambda x: x["input"].upper()
)
result = chain.invoke({"input": "hello"})
# {"input": "hello", "output": "HELLO"}
# 在链中使用
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages([
("user", "{input}")
])
chain = (
{"input": RunnablePassthrough()} |
prompt |
llm |
RunnablePassthrough.assign(
original_input=lambda x: x["input"]
)
)
RunnableMap
别名:RunnableParallel
RunnableMap = RunnableParallel
RunnableGenerator
包装生成器函数为 Runnable。
class RunnableGenerator(Runnable[Input, Output]):
"""生成器包装器"""
transform: Callable[
[Iterator[Input]],
Iterator[Output]
]
"""转换函数(迭代器到迭代器)"""
atransform: Optional[Callable[
[AsyncIterator[Input]],
AsyncIterator[Output]
]] = None
"""异步版本"""
RunnableEach
对列表中的每个元素应用 Runnable。
class RunnableEach(Runnable[List[Input], List[Output]]):
"""批量处理"""
bound: Runnable[Input, Output]
"""要应用到每个元素的 Runnable"""
使用示例
python
from langchain_core.runnables import RunnableEach
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
# 批量处理
chain = RunnableEach(llm)
results = chain.invoke([
"What is Python?",
"What is JavaScript?",
"What is Rust?"
])
RunnableItemPicker
从字典中选择特定字段。
class RunnableItemPicker(Runnable[Dict[str, Any], Any]):
"""字段选择器"""
keys: Union[str, List[str]]
"""要选择的键"""
使用示例
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
RunnableBranch
)
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o")
# ========== 示例1: 基础链 ==========
prompt = ChatPromptTemplate.from_template("Tell me a {adjective} joke about {topic}")
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"adjective": "funny", "topic": "Python"})
# ========== 示例2: 并行执行 ==========
parallel = RunnableParallel({
"joke": ChatPromptTemplate.from_template("Tell me a joke about {topic}") | llm | StrOutputParser(),
"fact": ChatPromptTemplate.from_template("Tell me a fact about {topic}") | llm | StrOutputParser(),
})
result = parallel.invoke({"topic": "Python"})
# ========== 示例3: 数据流转换 ==========
chain = (
{"topic": RunnablePassthrough()} |
ChatPromptTemplate.from_template("Tell me about {topic}") |
llm |
StrOutputParser()
)
result = chain.invoke("LangChain")
# ========== 示例4: 条件分支 ==========
def route_function(x):
if "joke" in x["input"].lower():
return "joke_chain"
else:
return "answer_chain"
joke_chain = ChatPromptTemplate.from_template("{input}") | llm | StrOutputParser()
answer_chain = ChatPromptTemplate.from_template("Answer: {input}") | llm | StrOutputParser()
branch = RunnableBranch(
(lambda x: "joke" in x["input"].lower(), joke_chain),
answer_chain
)
# ========== 示例5: 自定义处理 ==========
def custom_process(x):
# 自定义处理逻辑
text = x["text"]
return f"Processed: {text.upper()}"
chain = (
RunnableLambda(lambda x: {"text": x}) |
RunnableLambda(custom_process)
)
# ========== 示例6: 复杂组合 ==========
analysis_chain = (
RunnableParallel({
"topic": RunnablePassthrough(),
"sentiment": (lambda x: ChatPromptTemplate.from_template(f"Analyze sentiment: {x}") | llm | StrOutputParser()),
}) |
RunnableLambda(lambda x: f"Topic: {x['topic']}, Sentiment: {x['sentiment']}")
)
# ========== 示例7: 错误处理 ==========
from langchain_core.runnables import RunnableRetry
flaky_chain = ChatOpenAI() | StrOutputParser()
reliable_chain = RunnableRetry(
bound=flaky_chain,
max_attempts=3
)
相关 API
- Runnables API - Runnable 接口详解
- Prompts API
- 教程:链式调用