langchain_core.runnables.base
.RunnableBinding¶
注意
RunnableBinding实现了标准Runnable Interface
。 🏃
Runnable(可运行)接口具有额外的可用方法,例如Runnable 接口
,包括:with_types
、with_retry
、assign
、bind
、get_graph
等。
- 类 langchain_core.runnables.base.RunnableBinding[源代码]¶
继承自:
RunnableBindingBase
oning [输入
], [输出
]包装一个Runnable,添加更多功能。
RunnableBinding可以被视为一个“可运行装饰器”,它保留了Runnable的必要功能;即,批处理、流处理和异步支持,同时增加了额外的功能。
任何从Runnable继承的类都可以绑定到RunnableBinding。Runnable公开了一组标准方法来创建RunnableBinding或RunnableBinding的子类(例如,RunnableRetry,RunnableWithFallbacks),它们增加了额外的功能。
这些方法包括:- bind:将kwargs绑定到在执行时传递给底层Runnable的参数。 - with_config:将配置绑定到在执行时传递给底层Runnable的配置。 - with_listeners:将生命周期监听器绑定到底层Runnable。 - with_types:重写底层Runnable的输入和输出类型。 - with_retry:将重试策略绑定到底层Runnable。 - with_fallbacks:将回退策略绑定到底层Runnable。
示例
bind:将kwargs绑定到在执行时传递给底层Runnable的参数。
# Create a Runnable binding that invokes the ChatModel with the # additional kwarg `stop=['-']` when running it. from langchain_community.chat_models import ChatOpenAI model = ChatOpenAI() model.invoke('Say "Parrot-MAGIC"', stop=['-']) # Should return `Parrot` # Using it the easy way via `bind` method which returns a new # RunnableBinding runnable_binding = model.bind(stop=['-']) runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`
也可以直接实例化RunnableBinding(不推荐)
from langchain_core.runnables import RunnableBinding runnable_binding = RunnableBinding( bound=model, kwargs={'stop': ['-']} # <-- Note the additional kwargs ) runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`
从Runnable和kwargs创建RunnableBinding。
- 参数
bound – 这是Runnable代理调用的底层Runnable。
kwargs – 传递给底层Runnable时可选的kwargs,默认为None。
config – 可选的要将配置绑定到底层Runnable配置。默认为None。
config_factories – 可选的列表,用于在绑定到底层Runnable之前应用配置工厂。默认为None。
custom_input_type –指定以覆盖底层Runnable的输入类型为自定义类型。默认为None。
custom_output_type –指定以覆盖底层Runnable的输出类型为自定义类型。默认为None。
**其他keywords – 解包到基类。
- 参数 config: RunnableConfig [可选]¶
绑定到下层可运行对象的配置。
- 参数 config_factories: List[Callable[[RunnableConfig], RunnableConfig]] [可选]¶
绑定到下层可运行对象的配置工厂。
- 参数 custom_input_type: Optional[Any] = None¶
使用自定义类型覆盖下层可运行对象的输入类型。
类型可以是pydantic模型,或类型注解(例如,List[str])。
- 参数 custom_output_type: Optional[Any] = None¶
使用自定义类型覆盖下层可运行对象的输出类型。
类型可以是pydantic模型,或类型注解(例如,List[str])。
- 参数 kwargs: Mapping[str, Any] [可选]¶
在运行时传递给下层可运行对象的kwargs。
例如,当调用Runnable绑定时,底层的Runnable将以相同的输入调用,并带有这些额外的kwargs参数。
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]¶
默认实现使用asyncio.gather并行运行ainvoke。
默认的批处理实现适用于IO密集型Runnable。
子类如果能够更有效地批量处理的话,应重写此方法;例如,如果底层的Runnable使用支持批处理模式的API。
- 参数
inputs (List[Input]) – Runnable的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用Runnable时要使用的配置。该配置支持标准的键,如'tags'、'metadata'(用于跟踪目的)、'max_concurrency'(用于控制并行执行的工作量),以及其他键。请参阅RunnableConfig获取更多详细信息。默认值为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为False。
kwargs (Optional[Any]) – 传递给Runnable的额外关键字参数。
- 返回值
Runnable的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]¶
在输入列表上并行运行ainvoke,按完成顺序产生结果。
- 参数
inputs (Sequence[Input]) – Runnable的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用Runnable时使用的配置。该配置支持标准键,如‘tags’,‘metadata’(用于跟踪目的),‘max_concurrency’(用于控制并行工作数量)和其他键。请参阅RunnableConfig以获取更多详细信息。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为False。
kwargs (Optional[Any]) – 传递给Runnable的额外关键字参数。
- 产生
输入索引和Runnable的输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output ¶
ainvoke 的默认实现,从线程中调用 invoke。
默认实现允许即使 Runnable 没有实现 invoke 的本地异步版本,也可以使用异步代码。
子类如果可以异步运行,则应重写此方法。
- 参数
input (Input) –
config (Optional[RunnableConfig]) –
kwargs (Optional[Any]) –
- 返回类型
输出
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]]) BaseTool ¶
测试版
此 API 处于测试版,将来可能会有所改变。
从 Runnable 创建 BaseTool。
as_tool
将从一个Runnable实例化一个具有名字、描述和args_schema
的 BaseTool。尽可能从runnable.get_input_schema
推断模式。或者(例如,如果Runnable接受一个字典作为输入且字典的特定键没有指定类型),可以通过args_schema
直接指定模式。您还可以传递arg_types
来仅指定必需的参数及其类型。- 参数
args_schema (可选[类型[BaseModel]]) – 工具的模式。默认为 None。
name (可选[str]) – 工具的名字。默认为 None。
description (可选[str]) – 工具的描述。默认为 None。
arg_types (可选[Dict[str, 类型]]) – 参数名字到类型的字典。默认为 None。
- 返回值
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
新版本 0.2.14 中加入。
- async astream(input: Input, config: Optional[RunnableConfig], **kwargs: Optional[Any]) AsyncIterator[Output] ¶
astream 的默认实现,调用 ainvoke。子类应根据是否支持流式输出覆盖此方法。
- 参数
input (输入) – Runnable 的输入。
config (可选[RunnableConfig]) – 为 Runnable 使用的配置。默认为 None。
kwargs (Optional[Any]) – 传递给Runnable的额外关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[输出]
- async astream_events(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
测试版
此 API 处于测试版,将来可能会有所改变。
生成事件流。
用于创建一个迭代器,遍历StreamEvents,提供Runnable的实时进度信息,包括中间结果生成的StreamEvents。
StreamEvent是一个包含以下模式的字典
event
: str - 事件名称格式为:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的Runnable的名称。run_id
: str - 与发出事件的Runnable的给定执行相关联的随机生成的ID。子Runnable作为父Runnable执行的一部分调用的。一个唯一的ID。
parent_ids
: List[str] - 生成事件的父Runnable的ID。根Runnable将有一个空列表。父ID的顺序是从根到直接父级。仅适用于API的v2版本。API的v1版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的Runnable的标签。
metadata
: Optional[Dict[str, Any]] - 生成事件的Runnable的元数据。
data
: Dict[str, Any]
以下是一个表格,说明了可能由各种链发出的某些事件。为了简洁,省略了元数据字段。在表格之后包括链定义。
注意 此参考表是为schema的v2版本制定的。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content="hello")
on_chat_model_end
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content="hello world")
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
文档[...]
“hello world!, goodbye world!”
工具启动
某些工具
{“x”: 1, “y”: “2”}
工具结束
某些工具
{“x”: 1, “y”: “2”}
检索器启动
[检索器名称]
{“查询”: “你好”}
检索器结束
[检索器名称]
{“查询”: “你好”}
[文档[...], ..]
提示开始
[模板名称]
{“问题”: “你好”}
提示结束
[模板名称]
{“问题”: “你好”}
ChatPromptValue(消息: [系统消息, …])
除了标准事件外,用户还可以分配自定义事件(以下为示例)。
自定义事件仅在API的v2版本中显示!
自定义事件有以下格式
属性
类型
描述
名称
str
为事件定义的用户名称。
data
Any
与事件相关联的数据。这可以是任何东西,但我们建议使其可序列化为JSON。
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
某些工具:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分配自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (输入) – Runnable 的输入。
config (可选[RunnableConfig]) – 使用Runnable的配置。
version – 要使用的架构版本,可以是v2或v1。用户应使用v2。 v1用于向后兼容,将在0.4.0中弃用。默认值将在API稳定后分配。自定义事件仅在v2中显示。
include_names – 只包括具有匹配名称的runnables的事件。
include_types – 只包括具有匹配类型的runnables的事件。
include_tags – 只包括具有匹配标签的runnables的事件。
exclude_names – 排除具有匹配名称的runnables的事件。
exclude_types – 排除具有匹配类型的runnables的事件。
exclude_tags – 排除具有匹配标签的runnables的事件。
kwargs (可选[Any]) – 要传递给Runnable的附加关键字参数。这些将被传递到astream_log,因为astream_events的实现是基于astream_log构建的。
- 产生
StreamEvents的异步流。
- 引发
NotImplementedError – 如果版本不是v1或v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]¶
默认实现使用线程池执行器并行执行调用。
默认的批处理实现适用于IO密集型Runnable。
子类如果能够更有效地批量处理的话,应重写此方法;例如,如果底层的Runnable使用支持批处理模式的API。
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行输入列表上的 invoke,按完成顺序生成结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (布尔型) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
运行时可以设置的Runnable的可配置替代方案配置。
- 参数
which (ConfigurableField) – 用于选择替代方案的ConfigurableField实例。
default_key (str) – 如果未选择替代方案,则使用默认键。默认为“default”。
prefix_keys (bool) – 是否将键前缀为ConfigurableField id。默认为False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到Runnable实例或返回Runnable实例的可调用对象的字典。
- 返回值
配置了替代方案的新Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]¶
在运行时配置特定的Runnable字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的ConfigurableField实例的字典。
- 返回值
配置字段后的新Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output¶
将单个输入转换为输出。覆盖以实现。
- 参数
input (输入) – Runnable 的输入。
config (Optional[RunnableConfig]) – 在调用Runnable时使用的配置。该配置支持标准键如’tags’、‘metadata’,用于跟踪目的,‘max_concurrency’用于控制并行执行的工作量,以及其他键。请参阅RunnableConfig获取更多详细信息。
kwargs (Optional[Any]) –
- 返回值
Runnable 的输出。
- 返回类型
输出
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]¶
stream 的默认实现,它调用 invoke。子类应覆盖此方法以支持流式输出。
- 参数
input (输入) – Runnable 的输入。
config (可选[RunnableConfig]) – 为 Runnable 使用的配置。默认为 None。
kwargs (Optional[Any]) – 传递给Runnable的额外关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将 Runnable 序列化为 JSON。
- 返回值
Runnable 的 JSON 序列化表示。
- 返回类型