回调 API
BaseCallbackHandler 与事件处理
概述
回调系统允许你在 LangChain 运行的各个阶段插入自定义逻辑,用于日志、监控、调试等。
graph TD
A[BaseCallbackHandler] --> B[AimessageCallbackHandler]
A --> C[StdOutCallbackHandler]
A --> D[TokenCallbackHandler]
A --> E[FileCallbackHandler]
A --> F[自定义 Handler]
F --> G[日志记录]
F --> H[性能监控]
F --> I[自定义逻辑]
style A fill:#e1f5fe
style F fill:#c8e6c9
基类
BaseCallbackHandler
回调处理器抽象基类。
from langchain_core.callbacks import BaseCallbackHandler
class BaseCallbackHandler:
"""回调处理器基类"""
# ========== 运行开始/结束 ==========
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any,
) -> Any:
"""
LLM 开始时调用
Args:
serialized: 模型的序列化信息
prompts: 输入提示列表
**kwargs: 额外参数
"""
def on_llm_end(
self,
response: LLMResult,
**kwargs: Any,
) -> Any:
"""
LLM 结束时调用
Args:
response: LLM 响应结果
**kwargs: 额外参数
"""
def on_llm_error(
self,
error: Exception,
**kwargs: Any,
) -> Any:
"""
LLM 出错时调用
Args:
error: 异常对象
**kwargs: 额外参数
"""
def on_llm_new_token(
self,
token: str,
**kwargs: Any,
) -> Any:
"""
新 token 生成时调用(流式)
Args:
token: 新生成的 token
**kwargs: 额外参数
"""
# ========== Chain 事件 ==========
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any,
) -> Any:
"""Chain 开始时调用"""
def on_chain_end(
self,
outputs: Dict[str, Any],
**kwargs: Any,
) -> Any:
"""Chain 结束时调用"""
def on_chain_error(
self,
error: Exception,
**kwargs: Any,
) -> Any:
"""Chain 出错时调用"""
# ========== Tool 事件 ==========
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any,
) -> Any:
"""工具开始时调用"""
def on_tool_end(
self,
output: str,
**kwargs: Any,
) -> Any:
"""工具结束时调用"""
def on_tool_error(
self,
error: Exception,
**kwargs: Any,
) -> Any:
"""工具出错时调用"""
# ========== Agent 事件 ==========
def on_agent_action(
self,
action: AgentAction,
**kwargs: Any,
) -> Any:
"""Agent 执行动作时调用"""
def on_agent_finish(
self,
finish: AgentFinish,
**kwargs: Any,
) -> Any:
"""Agent 完成时调用"""
# ========== Retriever 事件 ==========
def on_retriever_start(
self,
query: str,
**kwargs: Any,
) -> Any:
"""检索器开始时调用"""
def on_retriever_end(
self,
documents: List[Document],
**kwargs: Any,
) -> Any:
"""检索器结束时调用"""
def on_retriever_error(
self,
error: Exception,
**kwargs: Any,
) -> Any:
"""检索器出错时调用"""
# ========== 聊点事件 ==========
def on_custom_event(
self,
name: str,
data: Any,
run_id: UUID,
tags: List[str],
**kwargs: Any,
) -> Any:
"""
自定义事件时调用
Args:
name: 事件名称
data: 事件数据
run_id: 运行 ID
tags: 标签列表
**kwargs: 额外参数
"""
# ========== 属性 ==========
@property
def ignore_llm(self) -> bool:
"""是否忽略 LLM 事件"""
@property
def ignore_chain(self) -> bool:
"""是否忽略 Chain 事件"""
@property
def ignore_agent(self) -> bool:
"""是否忽略 Agent 事件"""
@property
def ignore_retriever(self) -> bool:
"""是否忽略 Retriever 事件"""
@property
def raise_error(self) -> bool:
"""是否在错误时抛出异常"""
内置处理器
StdOutCallbackHandler
标准输出回调处理器。
from langchain.callbacks import StdOutCallbackHandler
class StdOutCallbackHandler(BaseCallbackHandler):
"""标准输出回调处理器"""
def __init__(
self,
color: Optional[str] = None,
):
"""
初始化
Args:
color: 输出颜色(支持终端颜色代码)
"""
使用示例
python
from langchain.callbacks import StdOutCallbackHandler
from langchain_openai import ChatOpenAI
handler = StdOutCallbackHandler()
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(
"你好",
config={"callbacks": [handler]}
)
# 输出:
# [chain/start] Entering Chain run with input: 'hello'
# [llm/start] Entering LLM run with input: 'hello'
# [llm/end] Finished LLM.
# [chain/end] Finished Chain.
TokenCallbackHandler
Token 计数回调处理器。
from langchain.callbacks import TokenCallbackHandler
class TokenCallbackHandler(BaseCallbackHandler):
"""Token 计数回调处理器"""
def __init__(self):
self.total_tokens = 0
self.prompt_tokens = 0
self.completion_tokens = 0
def on_llm_end(self, response, **kwargs):
"""记录 token 使用"""
self.total_tokens = response.llm_output["token_usage"]["total_tokens"]
self.prompt_tokens = response.llm_output["token_usage"]["prompt_tokens"]
self.completion_tokens = response.llm_output["token_usage"]["completion_tokens"]
使用示例
python
from langchain.callbacks import TokenCallbackHandler
token_handler = TokenCallbackHandler()
llm.invoke(
"写一首诗",
config={"callbacks": [token_handler]}
)
print(f"总 tokens: {token_handler.total_tokens}")
print(f"输入 tokens: {token_handler.prompt_tokens}")
print(f"输出 tokens: {token_handler.completion_tokens}")
FileCallbackHandler
文件输出回调处理器。
from langchain.callbacks import FileCallbackHandler
class FileCallbackHandler(BaseCallbackHandler):
"""文件回调处理器"""
def __init__(
self,
filename: str,
mode: str = "a",
):
"""
初始化
Args:
filename: 输出文件名
mode: 文件打开模式
"""
使用示例
python
from langchain.callbacks import FileCallbackHandler
handler = FileCallbackHandler("langchain.log")
llm.invoke(
"你好",
config={"callbacks": [handler]}
)
# 写入到 langchain.log
StreamingStdOutCallbackHandler
流式标准输出处理器。
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
handler = StreamingStdOutCallbackHandler()
# 流式输出
for chunk in llm.stream("讲一个故事", config={"callbacks": [handler]}):
pass
VerboseCallbackHandler
详细输出回调处理器。
from langchain.callbacks import verbose
# 使用 verbose 参数
chain.invoke(input, verbose=True)
使用示例
python
# ========== 示例1: 自定义回调处理器 ==========
from langchain_core.callbacks import BaseCallbackHandler
class MyCallbackHandler(BaseCallbackHandler):
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"LLM 开始,提示: {prompts[0][:50]}...")
def on_llm_end(self, response, **kwargs):
print(f"LLM 结束")
if hasattr(response, "llm_output"):
token_usage = response.llm_output.get("token_usage", {})
print(f"Token 使用: {token_usage}")
def on_llm_error(self, error, **kwargs):
print(f"LLM 错误: {error}")
handler = MyCallbackHandler()
llm.invoke("你好", config={"callbacks": [handler]})
# ========== 示例2: 多个回调 ==========
from langchain.callbacks import StdOutCallbackHandler, FileCallbackHandler
handlers = [
StdOutCallbackHandler(),
FileCallbackHandler("output.log")
]
llm.invoke("你好", config={"callbacks": handlers})
# ========== 示例3: 在链中使用 ==========
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
class TimingCallback(BaseCallbackHandler):
import time
def __init__(self):
self.start_time = None
def on_chain_start(self, serialized, inputs, **kwargs):
self.start_time = self.time.time()
def on_chain_end(self, outputs, **kwargs):
duration = self.time.time() - self.start_time
print(f"Chain 耗时: {duration:.2f}s")
timing = TimingCallback()
chain = prompt | llm
result = chain.invoke({"input": "你好"}, config={"callbacks": [timing]})
# ========== 示例4: 全局回调 ==========
import os
from langchain.globals import set_debug
set_debug(True) # 启用全局调试(所有运行都会打印)
# ========== 示例5: 回调管理器 ==========
from langchain_core.callbacks import CallbackManager
manager = CallbackManager(handlers=[handler1, handler2])
config = {"callbacks": manager}