langchain_core.callbacks.manager
.dispatch_custom_event¶
- langchain_core.callbacks.manager.dispatch_custom_event(name: str, data: Any, *, config: Optional[RunnableConfig] = None) None [source]¶
分发临时事件。
- 参数:
name (字符串) – 临时事件的名字。
data (任何类型) – 临时事件的数据。自由格式数据。理想上是应该能够被 JSON 序列化,以避免后续序列化问题,但这不是强制性的。
config (可选[RunnableConfig]) – 可选配置对象。与异步 API 类似,但不是必须的。
- 返回类型:
None
示例:
from langchain_core.callbacks import BaseCallbackHandler from langchain_core.callbacks import dispatch_custom_event from langchain_core.runnable import RunnableLambda class CustomCallbackManager(BaseCallbackHandler): def on_custom_event( self, name: str, data: Any, *, run_id: UUID, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: print(f"Received custom event: {name} with data: {data}") def foo(inputs): dispatch_custom_event("my_event", {"bar": "buzz}) return inputs foo_ = RunnableLambda(foo) foo_.invoke({"a": "1"}, {"callbacks": [CustomCallbackManager()]})
自 0.2.15 版本开始新加入。