链式调用 API

LCEL 操作符与链组合

概述

LangChain 表达式语言(LCEL)提供了一套声明式的方法来组合链。所有组件都实现了 Runnable 接口,可以使用统一的操作符进行组合。

graph LR
                    A[Runnable] --> B["| 操作符"]
                    B --> C[链组合]

                    D[RunnableLambda] --> A
                    E[RunnableParallel] --> A
                    F[RunnableBranch] --> A
                    G[RunnablePassthrough] --> A

                    style A fill:#e1f5fe
                    style B fill:#c8e6c9

LCEL 操作符

| (管道操作符)

串行连接两个 Runnable。

def __or__(
    self,
    other: Runnable[Any, Any],
) -> Runnable[InputType, OtherOutputType]:
    """
    管道连接

    expression: runnable1 | runnable2
    等价于: runnable1.pipe(runnable2)

    Args:
        other: 下一个 Runnable

    Returns:
        组合后的 Runnable
    """

使用示例

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
llm = ChatOpenAI(model="gpt-4o")
parser = StrOutputParser()

# 使用 | 操作符组合
chain = prompt | llm | parser

# 等价于:
# chain = prompt.pipe(llm).pipe(parser)

result = chain.invoke({"topic": "Python"})

组合函数

RunnableLambda

将函数转换为 Runnable。

class RunnableLambda(Serializable, Runnable[Input, Output]):
    """函数包装器"""

    func: Callable[[Input], Output]
    """要包装的函数"""

    afunc: Optional[Callable[[Input], Awaitable[Output]]] = None
    """异步版本函数"""

构造方法

python
from langchain_core.runnables import RunnableLambda

# 包装普通函数
def uppercase(text: str) -> str:
    return text.upper()

uppercase_runnable = RunnableLambda(uppercase)

# 使用 lambda
runnable = RunnableLambda(lambda x: x["text"].upper())

# 异步函数
import asyncio

async def auppercase(text: str) -> str:
    await asyncio.sleep(0.1)
    return text.upper()

async_runnable = RunnableLambda(func=uppercase, afunc=aucppercase)

RunnableParallel

并行执行多个 Runnable,合并结果。

class RunnableParallel(
    Runnable[Input, Dict[str, Any]],
    Serializable,
):
    """并行执行"""

    steps: Dict[str, Runnable[Any, Any]]
    """要并行执行的 Runnable 映射"""

    # 也可以使用别名: RunnableParallel = RunnableParallel

使用方式

python
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

# 方式1: 使用字典
chain = RunnableParallel({
    "joke": ChatOpenAI().pipe(lambda x: x.content),
    "poem": ChatOpenAI().pipe(lambda x: x.content),
})

result = chain.invoke({"input": "Tell me a joke and a poem"})

# 方式2: 使用 ** 解包
joke_chain = ChatOpenAI() | (lambda x: x.content)
poem_chain = ChatOpenAI() | (lambda x: x.content)

chain = RunnableParallel(
    joke=joke_chain,
    poem=poem_chain
)

# 方式3: 分配输入
parallel = RunnableParallel(
    input=lambda x: x["input"],
    output=lambda x: x["output"]
)

RunnableBranch

条件分支,根据输入路由到不同的 Runnable。

class RunnableBranch(
    Runnable[Input, Output],
    Serializable,
):
    """条件分支"""

    branches: List[
        Tuple[
            Callable[[Input], bool],  # 条件函数
            Runnable[Input, Output]    # 对应的 Runnable
        ]
    ]
    """(条件, Runnable) 列表"""

使用示例

python
from langchain_core.runnables import RunnableBranch
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

# 定义不同的处理链
joke_chain = ChatPromptTemplate.from_template("Tell me a joke about {topic}") | llm
poem_chain = ChatPromptTemplate.from_template("Write a poem about {topic}") | llm
fact_chain = ChatPromptTemplate.from_template("Tell me a fact about {topic}") | llm

# 创建分支
branch = RunnableBranch(
    (lambda x: x["style"] == "funny", joke_chain),
    (lambda x: x["style"] == "creative", poem_chain),
    fact_chain  # 默认分支
)

result = branch.invoke({"style": "funny", "topic": "Python"})

RunnablePassthrough

原样传递输入。

class RunnablePassthrough(Runnable[Input, Input]):
    """原样传递"""

    @staticmethod
    def assign(**kwargs: Union[Runnable[Any, Any], Any]) -> Runnable:
        """
        添加额外字段

        Args:
            **kwargs: 要添加的字段或 Runnable

        Returns:
            添加字段后的 Runnable
        """

使用示例

python
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

# 原样传递
passthrough = RunnablePassthrough()
result = passthrough.invoke({"a": 1, "b": 2})
# {"a": 1, "b": 2}

# 添加字段
chain = RunnablePassthrough.assign(
    output=lambda x: x["input"].upper()
)
result = chain.invoke({"input": "hello"})
# {"input": "hello", "output": "HELLO"}

# 在链中使用
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("user", "{input}")
])

chain = (
    {"input": RunnablePassthrough()} |
    prompt |
    llm |
    RunnablePassthrough.assign(
        original_input=lambda x: x["input"]
    )
)

RunnableMap

别名:RunnableParallel

RunnableMap = RunnableParallel

RunnableGenerator

包装生成器函数为 Runnable。

class RunnableGenerator(Runnable[Input, Output]):
    """生成器包装器"""

    transform: Callable[
        [Iterator[Input]],
        Iterator[Output]
    ]
    """转换函数(迭代器到迭代器)"""

    atransform: Optional[Callable[
        [AsyncIterator[Input]],
        AsyncIterator[Output]
    ]] = None
    """异步版本"""

RunnableEach

对列表中的每个元素应用 Runnable。

class RunnableEach(Runnable[List[Input], List[Output]]):
    """批量处理"""

    bound: Runnable[Input, Output]
    """要应用到每个元素的 Runnable"""

使用示例

python
from langchain_core.runnables import RunnableEach
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

# 批量处理
chain = RunnableEach(llm)

results = chain.invoke([
    "What is Python?",
    "What is JavaScript?",
    "What is Rust?"
])

RunnableItemPicker

从字典中选择特定字段。

class RunnableItemPicker(Runnable[Dict[str, Any], Any]):
    """字段选择器"""

    keys: Union[str, List[str]]
    """要选择的键"""

使用示例

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
    RunnableBranch
)
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o")

# ========== 示例1: 基础链 ==========
prompt = ChatPromptTemplate.from_template("Tell me a {adjective} joke about {topic}")
chain = prompt | llm | StrOutputParser()
result = chain.invoke({"adjective": "funny", "topic": "Python"})

# ========== 示例2: 并行执行 ==========
parallel = RunnableParallel({
    "joke": ChatPromptTemplate.from_template("Tell me a joke about {topic}") | llm | StrOutputParser(),
    "fact": ChatPromptTemplate.from_template("Tell me a fact about {topic}") | llm | StrOutputParser(),
})
result = parallel.invoke({"topic": "Python"})

# ========== 示例3: 数据流转换 ==========
chain = (
    {"topic": RunnablePassthrough()} |
    ChatPromptTemplate.from_template("Tell me about {topic}") |
    llm |
    StrOutputParser()
)
result = chain.invoke("LangChain")

# ========== 示例4: 条件分支 ==========
def route_function(x):
    if "joke" in x["input"].lower():
        return "joke_chain"
    else:
        return "answer_chain"

joke_chain = ChatPromptTemplate.from_template("{input}") | llm | StrOutputParser()
answer_chain = ChatPromptTemplate.from_template("Answer: {input}") | llm | StrOutputParser()

branch = RunnableBranch(
    (lambda x: "joke" in x["input"].lower(), joke_chain),
    answer_chain
)

# ========== 示例5: 自定义处理 ==========
def custom_process(x):
    # 自定义处理逻辑
    text = x["text"]
    return f"Processed: {text.upper()}"

chain = (
    RunnableLambda(lambda x: {"text": x}) |
    RunnableLambda(custom_process)
)

# ========== 示例6: 复杂组合 ==========
analysis_chain = (
    RunnableParallel({
        "topic": RunnablePassthrough(),
        "sentiment": (lambda x: ChatPromptTemplate.from_template(f"Analyze sentiment: {x}") | llm | StrOutputParser()),
    }) |
    RunnableLambda(lambda x: f"Topic: {x['topic']}, Sentiment: {x['sentiment']}")
)

# ========== 示例7: 错误处理 ==========
from langchain_core.runnables import RunnableRetry

flaky_chain = ChatOpenAI() | StrOutputParser()
reliable_chain = RunnableRetry(
    bound=flaky_chain,
    max_attempts=3
)

相关 API