Runnable 接口 API

LangChain 表达式语言核心接口

概述

Runnable 是 LangChain 中的核心接口,所有组件(LLM、提示模板、输出解析器等)都实现了这个接口,提供统一的调用方式。

graph TD
                    A[Runnable] --> B[invoke]
                    A --> C[stream]
                    A --> D[batch]
                    A --> E[ainvoke]
                    A --> F[astream]
                    A --> G[abatch]

                    A --> H["| 操作符"]
                    A --> I[pipe]
                    A --> J[bind]
                    A --> K[with_retry]
                    A --> L[with_fallbacks]
    A --> M[map]

                    style A fill:#e1f5fe
                    style H fill:#c8e6c9

Runnable 接口定义

Runnable

LangChain 中所有可运行组件的基类接口。

class Runnable(Generic[Input, Output], ABC):
    """
    可运行组件接口

    Type Parameters:
        Input: 输入类型
        Output: 输出类型
    """

    # ========== 核心同步方法 ==========

    @abstractmethod
    def invoke(
        self,
        input: Input,
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Output:
        """
        同步调用

        Args:
            input: 输入数据
            config: 运行配置(callbacks, metadata, tags)
            **kwargs: 额外参数

        Returns:
            输出数据
        """

    def stream(
        self,
        input: Input,
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Iterator[Output]:
        """
        流式调用

        Yields:
            输出数据块
        """

    def batch(
        self,
        inputs: List[Input],
        config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
        **kwargs: Any,
    ) -> List[Output]:
        """
        批量调用

        Args:
            inputs: 输入列表
            config: 配置(单个或与 inputs 对应的列表)
            **kwargs: 额外参数

        Returns:
            输出列表
        """

    # ========== 核心异步方法 ==========

    @abstractmethod
    async def ainvoke(
        self,
        input: Input,
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Output:
        """异步调用"""

    async def astream(
        self,
        input: Input,
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> AsyncIterator[Output]:
        """异步流式调用"""

    async def abatch(
        self,
        inputs: List[Input],
        config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
        **kwargs: Any,
    ) -> List[Output]:
        """异步批量调用"""

    # ========== 组合方法 ==========

    def pipe(
        self,
        other: Runnable[Any, OtherOutput],
        name: Optional[str] = None,
    ) -> Runnable[Input, OtherOutput]:
        """
        管道连接(| 操作符实现)

        Args:
            other: 下一个 Runnable
            name: 可选名称

        Returns:
            组合后的 Runnable
        """

    def __or__(
        self,
        other: Runnable[Any, Any],
    ) -> Runnable[Input, Any]:
        """
        | 操作符

        expression: runnable1 | runnable2
        等价于: runnable1.pipe(runnable2)
        """

    def bind(
        self,
        **kwargs: Any,
    ) -> Runnable[Input, Output]:
        """
        绑定参数

        Args:
            **kwargs: 要绑定的参数

        Returns:
            绑定参数后的新 Runnable
        """

    def with_retry(
        self,
        *,
        retry_if_exception_type: Tuple[Type[BaseException], ...] = (Exception,),
        max_retries: int = 3,
        wait_exponential_multiplier: float = 1.0,
        wait_exponential_max: float = 10.0,
    ) -> Runnable[Input, Output]:
        """
        添加重试逻辑

        Args:
            retry_if_exception_type: 需要重试的异常类型
            max_retries: 最大重试次数
            wait_exponential_multiplier: 指数退避乘数
            wait_exponential_max: 最大等待时间

        Returns:
            带重试的 Runnable
        """

    def with_fallbacks(
        self,
        fallbacks: Sequence[Runnable[Input, Output]],
        *,
        exceptions_to_handle: Tuple[Type[BaseException], ...] = (Exception,),
    ) -> Runnable[Input, Output]:
        """
        添加降级策略

        Args:
            fallbacks: 降级 Runnable 列表
            exceptions_to_handle: 要处理的异常类型

        Returns:
            带降级的 Runnable
        """

    def with_config(
        self,
        config: RunnableConfig,
    ) -> Runnable[Input, Output]:
        """
        绑定配置

        Args:
            config: 运行配置

        Returns:
            带配置的 Runnable
        """

    def with_types(
        self,
        input_type: Optional[Type[Input]] = None,
        output_type: Optional[Type[Output]] = None,
    ) -> Runnable[Input, Output]:
        """
        配置类型

        Args:
            input_type: 输入类型
            output_type: 输出类型

        Returns:
            配置类型后的 Runnable
        """

    def map(
        self,
        func: Optional[Callable[[Output], Any]] = None,
    ) -> Runnable[Input, Any]:
        """
        映射函数到输出

        Args:
            func: 映射函数(None 则使用 identity)

        Returns:
            映射后的 Runnable
        """

核心方法详解

RunnableConfig

运行配置类型。

from typing import Optional, Dict, Any, List, Union
from langchain_core.callbacks import Callbacks

class RunnableConfig(TypedDict, total=False):
    """运行配置字典"""

    callbacks: Callbacks
    """回调处理器"""

    tags: List[str]
    """标签列表"""

    metadata: Dict[str, Any]
    """元数据字典"""

    run_name: str
    """运行名称"""

    max_concurrency: Optional[int]
    """最大并发数"""

    recursion_limit: int
    """递归限制(用于 RunnableBranch)"""

使用示例

python
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StdOutCallbackHandler

llm = ChatOpenAI(model="gpt-4o")

# 带配置调用
response = llm.invoke(
    "你好",
    config={
        "callbacks": [StdOutCallbackHandler()],
        "tags": ["greeting", "test"],
        "metadata": {"user_id": "user_123"},
        "run_name": "greeting_generation"
    }
)

invoke / ainvoke

同步/异步调用方法。

python
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

# 同步调用
response = llm.invoke("Tell me a joke")

# 异步调用
import asyncio

async def async_invoke():
    response = await llm.ainvoke("Tell me a joke")
    return response

asyncio.run(async_invoke())

stream / astream

流式调用方法。

python
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

# 同步流式
for chunk in llm.stream("Tell me a story"):
    print(chunk.content, end="", flush=True)

# 异步流式
async def async_stream():
    async for chunk in llm.astream("Tell me a story"):
        print(chunk.content, end="", flush=True)

import asyncio
asyncio.run(async_stream())

batch / abatch

批量调用方法。

python
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

# 批量处理
inputs = [
    "What is Python?",
    "What is JavaScript?",
    "What is Rust?"
]
responses = llm.batch(inputs)

# 带不同配置的批量
configs = [
    {"tags": ["question1"]},
    {"tags": ["question2"]},
    {"tags": ["question3"]},
]
responses = llm.batch(inputs, config=configs)

# 异步批量
async def async_batch():
    responses = await llm.abatch(inputs)
    return responses

import asyncio
asyncio.run(async_batch())

工具函数

RunnableConfig

配置相关的辅助类。

from langchain_core.runnables import ensure_config, get_config

def ensure_config(
    config: Optional[RunnableConfig] = None
) -> RunnableConfig:
    """确保返回有效配置"""

def get_config() -> RunnableConfig:
    """获取当前运行上下文的配置"""

RunnableParallel

并行执行(别名)。

from langchain_core.runnables import RunnableParallel

# 与 RunnableMap 相同
RunnableParallel = RunnableMap

coalesce_to Runnable

将对象转换为 Runnable。

python
from langchain_core.runnables import (
    runnable_lambda,
    RunnableLambda
)

# 两种方式相同
runnable_lambda(lambda x: x.upper())
RunnableLambda(lambda x: x.upper())

chain 函数

快速创建链的工具函数。

from langchain_core.runnables import chain

@chain
def my_custom_chain(input: str) -> str:
    """自定义链"""
    return input.upper()

# 使用
result = my_custom_chain.invoke("hello")
# "HELLO"

RunnableSequence

序列化执行多个 Runnable(使用 | 操作符)。

from langchain_core.runnables import RunnableSequence

# | 操作符创建的就是 RunnableSequence
chain = step1 | step2 | step3
# 等价于:
# chain = RunnableSequence(steps=[step1, step2, step3])

RunnableRetry

带重试的 Runnable 包装器。

from langchain_core.runnables import RunnableRetry

reliable_chain = RunnableRetry(
    bound=original_chain,
    max_attempts=3,
    wait_exponential_multiplier=1.0,
    wait_exponential_max=10.0
)

RunnableWithMessageHistory

带消息历史的 Runnable。

from langchain_core.runnables.history import RunnableWithMessageHistory

chain_with_history = RunnableWithMessageHistory(
    runnable=base_chain,
    get_session_history=lambda session_id: chat_history_store[session_id],
    input_messages_key="input",
    history_messages_key="history"
)

使用示例

python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import (
    RunnableLambda,
    RunnableParallel,
    RunnablePassthrough,
    RunnableBranch
)
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI(model="gpt-4o")

# ========== 示例1: 基础 Runnable ==========
chain = ChatPromptTemplate.from_template("Tell me about {topic}") | llm | StrOutputParser()
result = chain.invoke({"topic": "LangChain"})

# ========== 示例2: RunnableLambda ==========
def process(text: str) -> str:
    return text.upper()

lambda_chain = RunnableLambda(process)
result = lambda_chain.invoke("hello")
# "HELLO"

# ========== 示例3: RunnableParallel ==========
parallel = RunnableParallel({
    "joke": ChatPromptTemplate.from_template("Tell me a joke") | llm | StrOutputParser(),
    "fact": ChatPromptTemplate.from_template("Tell me a fact") | llm | StrOutputParser(),
})
result = parallel.invoke({})

# ========== 示例4: RunnablePassthrough ==========
chain = (
    RunnablePassthrough.assign(
        processed=lambda x: x["input"].upper()
    )
)
result = chain.invoke({"input": "hello"})
# {"input": "hello", "processed": "HELLO"}

# ========== 示例5: RunnableBranch ==========
branch = RunnableBranch(
    (lambda x: x["type"] == "joke", joke_chain),
    (lambda x: x["type"] == "fact", fact_chain),
    default_chain
)

# ========== 示例6: with_retry ==========
from langchain_core.runnables import RunnableRetry

flaky_chain = llm | StrOutputParser()
reliable_chain = flaky_chain.with_retry(
    retry_if_exception_type=(ConnectionError, TimeoutError),
    max_retries=3
)

# ========== 示例7: with_fallbacks ==========
primary_chain = ChatOpenAI(model="gpt-4o") | StrOutputParser()
fallback_chain = ChatOpenAI(model="gpt-4o-mini") | StrOutputParser()

resilient_chain = primary_chain.with_fallbacks(
    fallbacks=[fallback_chain],
    exceptions_to_handle=(RateLimitError, APIError)
)

# ========== 示例8: bind ==========
llm = ChatOpenAI(model="gpt-4o")
bound_llm = llm.bind(temperature=0.7, max_tokens=500)

# ========== 示例9: with_config ==========
chain = prompt | llm
configured_chain = chain.with_config(
    run_name="my_chain",
    tags=["production"],
    metadata={"version": "1.0"}
)

# ========== 示例10: 组合使用 ==========
complex_chain = (
    RunnableParallel({
        "input": RunnablePassthrough(),
        "context": RunnableLambda(lambda x: retrieve(x["input"]))
    })
    | (lambda x: f"Context: {x['context']}\nQuestion: {x['input']}")
    | ChatPromptTemplate.from_template("{input}")
    | llm
    | StrOutputParser()
)

# ========== 示例11: 异步使用 ==========
import asyncio

async def async_chain_example():
    chain = prompt | llm | StrOutputParser()

    # 异步调用
    result = await chain.ainvoke({"topic": "Python"})

    # 异步流式
    async for chunk in chain.astream({"topic": "Python"}):
        print(chunk, end="")

    # 异步批量
    results = await chain.abatch([
        {"topic": "Python"},
        {"topic": "JavaScript"}
    ])

asyncio.run(async_chain_example())

相关 API