langchain_core.runnables.base
.RunnableParallel¶
Note
RunnableParallel 实现了标准的 Runnable Interface
。 🏃
Runnable Interface
具有在 runnables 上可用的其他方法,例如 with_types
、 with_retry
、 assign
、 bind
、 get_graph
等。
- class langchain_core.runnables.base.RunnableParallel[source]¶
Bases:
RunnableSerializable
[Input
,Dict
[str
,Any
]]Runnable,并行运行 Runnables 的映射,并返回其输出的映射。
RunnableParallel 是 LCEL 的两个主要组合原语之一,另一个是 RunnableSequence。它并发调用 Runnables,为每个 Runnable 提供相同的输入。
RunnableParallel 可以直接实例化,也可以在序列中使用字典字面量来实例化。
这是一个使用函数的简单示例,用于说明 RunnableParallel 的用法
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 def mul_three(x: int) -> int: return x * 3 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) runnable_3 = RunnableLambda(mul_three) sequence = runnable_1 | { # this dict is coerced to a RunnableParallel "mul_two": runnable_2, "mul_three": runnable_3, } # Or equivalently: # sequence = runnable_1 | RunnableParallel( # {"mul_two": runnable_2, "mul_three": runnable_3} # ) # Also equivalently: # sequence = runnable_1 | RunnableParallel( # mul_two=runnable_2, # mul_three=runnable_3, # ) sequence.invoke(1) await sequence.ainvoke(1) sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3])
RunnableParallel 使并行运行 Runnables 变得容易。在下面的示例中,我们同时从两个不同的 Runnables 流式传输输出
from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableParallel from langchain_openai import ChatOpenAI model = ChatOpenAI() joke_chain = ( ChatPromptTemplate.from_template("tell me a joke about {topic}") | model ) poem_chain = ( ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model ) runnable = RunnableParallel(joke=joke_chain, poem=poem_chain) # Display stream output = {key: "" for key, _ in runnable.output_schema()} for chunk in runnable.stream({"topic": "bear"}): for key in chunk: output[key] = output[key] + chunk[key].content print(output) # noqa: T201
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs*: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现适用于 IO 绑定的 runnables。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 返回
来自 Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs*: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
在输入列表上并行运行 ainvoke,并在结果完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。 默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- Yields
输入索引和 Runnable 输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs*: Optional[Any]) Dict[str, Any] [source]¶
ainvoke 的默认实现,从线程调用 invoke。
即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。
如果子类可以异步运行,则应覆盖此方法。
- 参数
input (Input) –
config (Optional[RunnableConfig]) –
kwargs (Optional[Any]) –
- 返回类型
Dict[str, Any]
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 Beta 阶段,将来可能会发生变化。
从 Runnable 创建 BaseTool。
as_tool
将从 Runnable 实例化具有名称、描述和args_schema
的 BaseTool。 在可能的情况下,模式是从runnable.get_input_schema
推断出来的。 或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定的字典键),可以使用args_schema
直接指定模式。 您还可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 该工具的模式。 默认为 None。
name (Optional[str]) – 工具的名称。 默认为 None。
description (Optional[str]) – 工具的描述。 默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。 默认为 None。
- 返回
一个 BaseTool 实例。
- 返回类型
Typed dict input
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
String input
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本中的新增功能。
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs*: Optional[Any]) AsyncIterator[Dict[str, Any]] [source]¶
astream 的默认实现,它调用 ainvoke。 如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。 默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- Yields
Runnable 的输出。
- 返回类型
AsyncIterator[Dict[str, Any]]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs*: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 Beta 阶段,将来可能会发生变化。
生成事件流。
用于创建 StreamEvents 的迭代器,该迭代器提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个具有以下模式的字典
event
: str - 事件名称的格式为:on_[runnable_type]_(start|stream|end)。format: on_[runnable_type]_(start|stream|end).
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与发出事件的 Runnable 的给定执行关联的随机生成的 ID。 作为父 Runnable 执行的一部分调用的子 Runnable 将被分配其自己的唯一 ID。the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.
parent_ids
: List[str] - 生成事件的父 runnables 的 ID。 根 Runnable 将有一个空列表。 父 ID 的顺序是从根到直接父级。 仅适用于 API 的 v2 版本。 API 的 v1 版本将返回一个空列表。generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。the event.
metadata
: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。that generated the event.
data
: Dict[str, Any]
下面是一个表格,说明了各种链可能发出的一些事件。 为了简洁起见,元数据字段已从表中省略。 链定义已包含在表格之后。
注意 此参考表适用于模式的 V2 版本。
event
name
chunk
input
output
on_chat_model_start
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name]
{‘input’: ‘hello’}
on_llm_stream
[model name]
‘Hello’
on_llm_end
[model name]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name]
{“query”: “hello”}
on_retriever_end
[retriever name]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name]
{“question”: “hello”}
on_prompt_end
[template_name]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以调度自定义事件(请参见下面的示例)。
自定义事件将仅在 API 的 v2 版本中显示!
自定义事件具有以下格式
Attribute
Type
Description
name
str
事件的用户定义名称。
data
Any
与事件关联的数据。 这可以是任何内容,尽管我们建议使其为 JSON 可序列化的。
以下是与上面显示的标准事件关联的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
Example
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2 或 v1。 用户应使用 v2。v1 用于向后兼容,将在 0.4.0 中弃用。 在 API 稳定之前,不会分配默认值。 自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnables 的事件。
include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnables 的事件。
include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnables 的事件。
exclude_names (可选[Sequence[str]]) – 排除来自名称匹配的可运行对象的事件。
exclude_types (可选[Sequence[str]]) – 排除来自类型匹配的可运行对象的事件。
exclude_tags (可选[Sequence[str]]) – 排除来自标签匹配的可运行对象的事件。
kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的此实现是基于 astream_log 构建的。
- Yields
StreamEvents 的异步流。
- Raises
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
batch 的默认实现适用于 IO 绑定的 runnables。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) –
config (可选[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行列表中输入的 invoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) –
config (可选[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
配置可在运行时设置的 Runnable 的备选项。
- 参数
which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。
default_key (str) – 如果未选择备选项,则使用的默认键。默认为 “default”。
prefix_keys (bool) – 是否使用 ConfigurableField id 作为键的前缀。默认为 False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。
- 返回
配置了备选项的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable">[Input, Output] ¶
配置运行时特定的 Runnable 字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。
- 返回
配置了字段的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- invoke(input: Input, config: Optional[RunnableConfig] = None) Dict[str, Any] [source]¶
将单个输入转换为输出。覆盖此方法以实现。
- 参数
input (Input) – Runnable 的输入。
config (可选[RunnableConfig]) – 调用 Runnable 时使用的配置。 此配置支持标准键,如 ‘tags’、‘metadata’ 用于跟踪目的,‘max_concurrency’ 用于控制并行执行的工作量,以及其他键。 请参阅 RunnableConfig 以获取更多详细信息。
- 返回
Runnable 的输出。
- 返回类型
Dict[str, Any]
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Dict[str, Any]] [source]¶
stream 的默认实现,它调用 invoke。 如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。 默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- Yields
Runnable 的输出。
- 返回类型
Iterator[Dict[str, Any]]
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将 Runnable 序列化为 JSON。
- 返回
Runnable 的 JSON 可序列化表示形式。
- 返回类型