langchain_core.callbacks.manager.dispatch_custom_event

langchain_core.callbacks.manager.dispatch_custom_event(name: str, data: Any, *, config: Optional[RunnableConfig] = None) None[source]

分发临时事件。

参数:
  • name (字符串) – 临时事件的名字。

  • data (任何类型) – 临时事件的数据。自由格式数据。理想上是应该能够被 JSON 序列化,以避免后续序列化问题,但这不是强制性的。

  • config (可选[RunnableConfig]) – 可选配置对象。与异步 API 类似,但不是必须的。

返回类型:

None

示例:

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.callbacks import dispatch_custom_event
from langchain_core.runnable import RunnableLambda

class CustomCallbackManager(BaseCallbackHandler):
    def on_custom_event(
        self,
        name: str,
        data: Any,
        *,
        run_id: UUID,
        tags: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        print(f"Received custom event: {name} with data: {data}")

def foo(inputs):
    dispatch_custom_event("my_event", {"bar": "buzz})
    return inputs

foo_ = RunnableLambda(foo)
foo_.invoke({"a": "1"}, {"callbacks": [CustomCallbackManager()]})

自 0.2.15 版本开始新加入。