langchain_core.runnables.base.RunnableParallel

Note

RunnableParallel 实现了标准的 Runnable Interface。 🏃

Runnable Interface 具有在 runnables 上可用的其他方法,例如 with_typeswith_retryassignbindget_graph 等。

class langchain_core.runnables.base.RunnableParallel[source]

Bases: RunnableSerializable[Input, Dict[str, Any]]

Runnable,并行运行 Runnables 的映射,并返回其输出的映射。

RunnableParallel 是 LCEL 的两个主要组合原语之一,另一个是 RunnableSequence。它并发调用 Runnables,为每个 Runnable 提供相同的输入。

RunnableParallel 可以直接实例化,也可以在序列中使用字典字面量来实例化。

这是一个使用函数的简单示例,用于说明 RunnableParallel 的用法

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

def mul_three(x: int) -> int:
    return x * 3

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)

sequence = runnable_1 | {  # this dict is coerced to a RunnableParallel
    "mul_two": runnable_2,
    "mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
#     {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
#     mul_two=runnable_2,
#     mul_three=runnable_3,
# )

sequence.invoke(1)
await sequence.ainvoke(1)

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])

RunnableParallel 使并行运行 Runnables 变得容易。在下面的示例中,我们同时从两个不同的 Runnables 流式传输输出

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
joke_chain = (
    ChatPromptTemplate.from_template("tell me a joke about {topic}")
    | model
)
poem_chain = (
    ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
    | model
)

runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)

# Display stream
output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "bear"}):
    for key in chunk:
        output[key] = output[key] + chunk[key].content
    print(output)  # noqa: T201
param steps__: Mapping[str, Runnable[Input, Any]] [Required]
async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs*: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现适用于 IO 绑定的 runnables。

如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

返回

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs*: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在输入列表上并行运行 ainvoke,并在结果完成时产生结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。 默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

Yields

输入索引和 Runnable 输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs*: Optional[Any]) Dict[str, Any][source]

ainvoke 的默认实现,从线程调用 invoke。

即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。

如果子类可以异步运行,则应覆盖此方法。

参数
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Optional[Any]) –

返回类型

Dict[str, Any]

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,将来可能会发生变化。

从 Runnable 创建 BaseTool。

as_tool 将从 Runnable 实例化具有名称、描述和 args_schema 的 BaseTool。 在可能的情况下,模式是从 runnable.get_input_schema 推断出来的。 或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定的字典键),可以使用 args_schema 直接指定模式。 您还可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 该工具的模式。 默认为 None。

  • name (Optional[str]) – 工具的名称。 默认为 None。

  • description (Optional[str]) – 工具的描述。 默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。 默认为 None。

返回

一个 BaseTool 实例。

返回类型

BaseTool

Typed dict input

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

String input

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本中的新增功能。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs*: Optional[Any]) AsyncIterator[Dict[str, Any]][source]

astream 的默认实现,它调用 ainvoke。 如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。 默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

返回类型

AsyncIterator[Dict[str, Any]]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs*: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,将来可能会发生变化。

生成事件流。

用于创建 StreamEvents 的迭代器,该迭代器提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个具有以下模式的字典

  • event: str - 事件名称的格式为:on_[runnable_type]_(start|stream|end)。

    format: on_[runnable_type]_(start|stream|end).

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与发出事件的 Runnable 的给定执行关联的随机生成的 ID。 作为父 Runnable 执行的一部分调用的子 Runnable 将被分配其自己的唯一 ID。

    the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.

  • parent_ids: List[str] - 生成事件的父 runnables 的 ID。 根 Runnable 将有一个空列表。 父 ID 的顺序是从根到直接父级。 仅适用于 API 的 v2 版本。 API 的 v1 版本将返回一个空列表。

    generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    the event.

  • metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。

    that generated the event.

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。 为了简洁起见,元数据字段已从表中省略。 链定义已包含在表格之后。

注意 此参考表适用于模式的 V2 版本。

event

name

chunk

input

output

on_chat_model_start

[model name]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[model name]

AIMessageChunk(content=”hello”)

on_chat_model_end

[model name]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[model name]

{‘input’: ‘hello’}

on_llm_stream

[model name]

‘Hello’

on_llm_end

[model name]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[retriever name]

{“query”: “hello”}

on_retriever_end

[retriever name]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[template_name]

{“question”: “hello”}

on_prompt_end

[template_name]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以调度自定义事件(请参见下面的示例)。

自定义事件将仅在 API 的 v2 版本中显示!

自定义事件具有以下格式

Attribute

Type

Description

name

str

事件的用户定义名称。

data

Any

与事件关联的数据。 这可以是任何内容,尽管我们建议使其为 JSON 可序列化的。

以下是与上面显示的标准事件关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

Example

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

Example: Dispatch Custom Event

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2v1。 用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。 在 API 稳定之前,不会分配默认值。 自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnables 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnables 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnables 的事件。

  • exclude_names (可选[Sequence[str]]) – 排除来自名称匹配的可运行对象的事件。

  • exclude_types (可选[Sequence[str]]) – 排除来自类型匹配的可运行对象的事件。

  • exclude_tags (可选[Sequence[str]]) – 排除来自标签匹配的可运行对象的事件。

  • kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的此实现是基于 astream_log 构建的。

Yields

StreamEvents 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

batch 的默认实现适用于 IO 绑定的 runnables。

如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) –

  • config (可选[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行列表中输入的 invoke,并在完成时产生结果。

参数
  • inputs (Sequence[Input]) –

  • config (可选[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

配置可在运行时设置的 Runnable 的备选项。

参数
  • which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。

  • default_key (str) – 如果未选择备选项,则使用的默认键。默认为 “default”。

  • prefix_keys (bool) – 是否使用 ConfigurableField id 作为键的前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

返回

配置了备选项的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable">[Input, Output]

配置运行时特定的 Runnable 字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

返回

配置了字段的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
invoke(input: Input, config: Optional[RunnableConfig] = None) Dict[str, Any][source]

将单个输入转换为输出。覆盖此方法以实现。

参数
  • input (Input) – Runnable 的输入。

  • config (可选[RunnableConfig]) – 调用 Runnable 时使用的配置。 此配置支持标准键,如 ‘tags’、‘metadata’ 用于跟踪目的,‘max_concurrency’ 用于控制并行执行的工作量,以及其他键。 请参阅 RunnableConfig 以获取更多详细信息。

返回

Runnable 的输出。

返回类型

Dict[str, Any]

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Dict[str, Any]][source]

stream 的默认实现,它调用 invoke。 如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。 默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

返回类型

Iterator[Dict[str, Any]]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回

Runnable 的 JSON 可序列化表示形式。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]

使用 RunnableParallel 的示例