Runnable 接口 API
LangChain 表达式语言核心接口
概述
Runnable 是 LangChain 中的核心接口,所有组件(LLM、提示模板、输出解析器等)都实现了这个接口,提供统一的调用方式。
graph TD
A[Runnable] --> B[invoke]
A --> C[stream]
A --> D[batch]
A --> E[ainvoke]
A --> F[astream]
A --> G[abatch]
A --> H["| 操作符"]
A --> I[pipe]
A --> J[bind]
A --> K[with_retry]
A --> L[with_fallbacks]
A --> M[map]
style A fill:#e1f5fe
style H fill:#c8e6c9
Runnable 接口定义
Runnable
LangChain 中所有可运行组件的基类接口。
class Runnable(Generic[Input, Output], ABC):
"""
可运行组件接口
Type Parameters:
Input: 输入类型
Output: 输出类型
"""
# ========== 核心同步方法 ==========
@abstractmethod
def invoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""
同步调用
Args:
input: 输入数据
config: 运行配置(callbacks, metadata, tags)
**kwargs: 额外参数
Returns:
输出数据
"""
def stream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Output]:
"""
流式调用
Yields:
输出数据块
"""
def batch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Output]:
"""
批量调用
Args:
inputs: 输入列表
config: 配置(单个或与 inputs 对应的列表)
**kwargs: 额外参数
Returns:
输出列表
"""
# ========== 核心异步方法 ==========
@abstractmethod
async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Output:
"""异步调用"""
async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[Output]:
"""异步流式调用"""
async def abatch(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
**kwargs: Any,
) -> List[Output]:
"""异步批量调用"""
# ========== 组合方法 ==========
def pipe(
self,
other: Runnable[Any, OtherOutput],
name: Optional[str] = None,
) -> Runnable[Input, OtherOutput]:
"""
管道连接(| 操作符实现)
Args:
other: 下一个 Runnable
name: 可选名称
Returns:
组合后的 Runnable
"""
def __or__(
self,
other: Runnable[Any, Any],
) -> Runnable[Input, Any]:
"""
| 操作符
expression: runnable1 | runnable2
等价于: runnable1.pipe(runnable2)
"""
def bind(
self,
**kwargs: Any,
) -> Runnable[Input, Output]:
"""
绑定参数
Args:
**kwargs: 要绑定的参数
Returns:
绑定参数后的新 Runnable
"""
def with_retry(
self,
*,
retry_if_exception_type: Tuple[Type[BaseException], ...] = (Exception,),
max_retries: int = 3,
wait_exponential_multiplier: float = 1.0,
wait_exponential_max: float = 10.0,
) -> Runnable[Input, Output]:
"""
添加重试逻辑
Args:
retry_if_exception_type: 需要重试的异常类型
max_retries: 最大重试次数
wait_exponential_multiplier: 指数退避乘数
wait_exponential_max: 最大等待时间
Returns:
带重试的 Runnable
"""
def with_fallbacks(
self,
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: Tuple[Type[BaseException], ...] = (Exception,),
) -> Runnable[Input, Output]:
"""
添加降级策略
Args:
fallbacks: 降级 Runnable 列表
exceptions_to_handle: 要处理的异常类型
Returns:
带降级的 Runnable
"""
def with_config(
self,
config: RunnableConfig,
) -> Runnable[Input, Output]:
"""
绑定配置
Args:
config: 运行配置
Returns:
带配置的 Runnable
"""
def with_types(
self,
input_type: Optional[Type[Input]] = None,
output_type: Optional[Type[Output]] = None,
) -> Runnable[Input, Output]:
"""
配置类型
Args:
input_type: 输入类型
output_type: 输出类型
Returns:
配置类型后的 Runnable
"""
def map(
self,
func: Optional[Callable[[Output], Any]] = None,
) -> Runnable[Input, Any]:
"""
映射函数到输出
Args:
func: 映射函数(None 则使用 identity)
Returns:
映射后的 Runnable
"""
核心方法详解
RunnableConfig
运行配置类型。
from typing import Optional, Dict, Any, List, Union
from langchain_core.callbacks import Callbacks
class RunnableConfig(TypedDict, total=False):
"""运行配置字典"""
callbacks: Callbacks
"""回调处理器"""
tags: List[str]
"""标签列表"""
metadata: Dict[str, Any]
"""元数据字典"""
run_name: str
"""运行名称"""
max_concurrency: Optional[int]
"""最大并发数"""
recursion_limit: int
"""递归限制(用于 RunnableBranch)"""
使用示例
python
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StdOutCallbackHandler
llm = ChatOpenAI(model="gpt-4o")
# 带配置调用
response = llm.invoke(
"你好",
config={
"callbacks": [StdOutCallbackHandler()],
"tags": ["greeting", "test"],
"metadata": {"user_id": "user_123"},
"run_name": "greeting_generation"
}
)
invoke / ainvoke
同步/异步调用方法。
python
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
# 同步调用
response = llm.invoke("Tell me a joke")
# 异步调用
import asyncio
async def async_invoke():
response = await llm.ainvoke("Tell me a joke")
return response
asyncio.run(async_invoke())
stream / astream
流式调用方法。
python
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
# 同步流式
for chunk in llm.stream("Tell me a story"):
print(chunk.content, end="", flush=True)
# 异步流式
async def async_stream():
async for chunk in llm.astream("Tell me a story"):
print(chunk.content, end="", flush=True)
import asyncio
asyncio.run(async_stream())
batch / abatch
批量调用方法。
python
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
# 批量处理
inputs = [
"What is Python?",
"What is JavaScript?",
"What is Rust?"
]
responses = llm.batch(inputs)
# 带不同配置的批量
configs = [
{"tags": ["question1"]},
{"tags": ["question2"]},
{"tags": ["question3"]},
]
responses = llm.batch(inputs, config=configs)
# 异步批量
async def async_batch():
responses = await llm.abatch(inputs)
return responses
import asyncio
asyncio.run(async_batch())
工具函数
RunnableConfig
配置相关的辅助类。
from langchain_core.runnables import ensure_config, get_config
def ensure_config(
config: Optional[RunnableConfig] = None
) -> RunnableConfig:
"""确保返回有效配置"""
def get_config() -> RunnableConfig:
"""获取当前运行上下文的配置"""
RunnableParallel
并行执行(别名)。
from langchain_core.runnables import RunnableParallel
# 与 RunnableMap 相同
RunnableParallel = RunnableMap
coalesce_to Runnable
将对象转换为 Runnable。
python
from langchain_core.runnables import (
runnable_lambda,
RunnableLambda
)
# 两种方式相同
runnable_lambda(lambda x: x.upper())
RunnableLambda(lambda x: x.upper())
chain 函数
快速创建链的工具函数。
from langchain_core.runnables import chain
@chain
def my_custom_chain(input: str) -> str:
"""自定义链"""
return input.upper()
# 使用
result = my_custom_chain.invoke("hello")
# "HELLO"
RunnableSequence
序列化执行多个 Runnable(使用 | 操作符)。
from langchain_core.runnables import RunnableSequence
# | 操作符创建的就是 RunnableSequence
chain = step1 | step2 | step3
# 等价于:
# chain = RunnableSequence(steps=[step1, step2, step3])
RunnableRetry
带重试的 Runnable 包装器。
from langchain_core.runnables import RunnableRetry
reliable_chain = RunnableRetry(
bound=original_chain,
max_attempts=3,
wait_exponential_multiplier=1.0,
wait_exponential_max=10.0
)
RunnableWithMessageHistory
带消息历史的 Runnable。
from langchain_core.runnables.history import RunnableWithMessageHistory
chain_with_history = RunnableWithMessageHistory(
runnable=base_chain,
get_session_history=lambda session_id: chat_history_store[session_id],
input_messages_key="input",
history_messages_key="history"
)
使用示例
python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
RunnableBranch
)
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI(model="gpt-4o")
# ========== 示例1: 基础 Runnable ==========
chain = ChatPromptTemplate.from_template("Tell me about {topic}") | llm | StrOutputParser()
result = chain.invoke({"topic": "LangChain"})
# ========== 示例2: RunnableLambda ==========
def process(text: str) -> str:
return text.upper()
lambda_chain = RunnableLambda(process)
result = lambda_chain.invoke("hello")
# "HELLO"
# ========== 示例3: RunnableParallel ==========
parallel = RunnableParallel({
"joke": ChatPromptTemplate.from_template("Tell me a joke") | llm | StrOutputParser(),
"fact": ChatPromptTemplate.from_template("Tell me a fact") | llm | StrOutputParser(),
})
result = parallel.invoke({})
# ========== 示例4: RunnablePassthrough ==========
chain = (
RunnablePassthrough.assign(
processed=lambda x: x["input"].upper()
)
)
result = chain.invoke({"input": "hello"})
# {"input": "hello", "processed": "HELLO"}
# ========== 示例5: RunnableBranch ==========
branch = RunnableBranch(
(lambda x: x["type"] == "joke", joke_chain),
(lambda x: x["type"] == "fact", fact_chain),
default_chain
)
# ========== 示例6: with_retry ==========
from langchain_core.runnables import RunnableRetry
flaky_chain = llm | StrOutputParser()
reliable_chain = flaky_chain.with_retry(
retry_if_exception_type=(ConnectionError, TimeoutError),
max_retries=3
)
# ========== 示例7: with_fallbacks ==========
primary_chain = ChatOpenAI(model="gpt-4o") | StrOutputParser()
fallback_chain = ChatOpenAI(model="gpt-4o-mini") | StrOutputParser()
resilient_chain = primary_chain.with_fallbacks(
fallbacks=[fallback_chain],
exceptions_to_handle=(RateLimitError, APIError)
)
# ========== 示例8: bind ==========
llm = ChatOpenAI(model="gpt-4o")
bound_llm = llm.bind(temperature=0.7, max_tokens=500)
# ========== 示例9: with_config ==========
chain = prompt | llm
configured_chain = chain.with_config(
run_name="my_chain",
tags=["production"],
metadata={"version": "1.0"}
)
# ========== 示例10: 组合使用 ==========
complex_chain = (
RunnableParallel({
"input": RunnablePassthrough(),
"context": RunnableLambda(lambda x: retrieve(x["input"]))
})
| (lambda x: f"Context: {x['context']}\nQuestion: {x['input']}")
| ChatPromptTemplate.from_template("{input}")
| llm
| StrOutputParser()
)
# ========== 示例11: 异步使用 ==========
import asyncio
async def async_chain_example():
chain = prompt | llm | StrOutputParser()
# 异步调用
result = await chain.ainvoke({"topic": "Python"})
# 异步流式
async for chunk in chain.astream({"topic": "Python"}):
print(chunk, end="")
# 异步批量
results = await chain.abatch([
{"topic": "Python"},
{"topic": "JavaScript"}
])
asyncio.run(async_chain_example())