langchain_core.runnables.base.RunnableSequence

注意

RunnableSequence 实现了标准的 Runnable Interface。 🏃

Runnable Interface 具有在 runnables 上可用的附加方法,例如 with_types, with_retry, assign, bind, get_graph 等。

class langchain_core.runnables.base.RunnableSequence[source]

基类: RunnableSerializable[Input, Output]

Runnable 序列,其中每个的输出是下一个的输入。

RunnableSequence 是 LangChain 中最重要的组合运算符,因为它几乎用于每个链。

RunnableSequence 可以直接实例化,或者更常见的是使用 | 运算符,其中左侧或右侧操作数(或两者)必须是 Runnable。

任何 RunnableSequence 自动支持同步、异步、批量处理。

batchabatch 的默认实现利用线程池和 asyncio gather,对于 IO 密集型 Runnables,将比 invoke 或 ainvoke 的幼稚调用更快。

批量处理是通过按顺序在 RunnableSequence 的每个组件上调用 batch 方法来实现的。

RunnableSequence 保留了其组件的流式处理属性,因此如果序列的所有组件都实现了 transform 方法——该方法实现了将流式输入映射到流式输出的逻辑——那么该序列将能够将输入流式传输到输出!

如果序列的任何组件未实现 transform,则流式处理仅在该组件运行后开始。 如果有多个阻塞组件,则流式处理在最后一个组件之后开始。

请注意:RunnableLambdas 默认不支持 transform! 因此,如果

您需要使用 RunnableLambdas,请注意将它们放置在 RunnableSequence 中的位置(如果您需要使用 .stream()/.astream() 方法)。

如果您需要任意逻辑并需要流式处理,您可以子类化 Runnable,并为您需要的任何逻辑实现 transform

这是一个简单的示例,它使用简单的函数来说明 RunnableSequence 的用法

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# Or equivalently:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])

这是一个示例,它使用 LLM 生成的流式 JSON 输出

from langchain_core.output_parsers.json import SimpleJsonOutputParser
from langchain_openai import ChatOpenAI

prompt = PromptTemplate.from_template(
    'In JSON format, give me a list of {topic} and their '
    'corresponding names in French, Spanish and in a '
    'Cat Language.'
)

model = ChatOpenAI()
chain = prompt | model | SimpleJsonOutputParser()

async for chunk in chain.astream({'topic': 'colors'}):
    print('-')  # noqa: T201
    print(chunk, sep='', flush=True)  # noqa: T201

创建一个新的 RunnableSequence。

参数
  • steps – 要包含在序列中的步骤。

  • name – Runnable 的名称。默认为 None。

  • first – 序列中的第一个 Runnable。默认为 None。

  • middle – 序列中的中间 Runnables。默认为 None。

  • last – 序列中的最后一个 Runnable。默认为 None。

Raises

ValueError – 如果序列的步骤少于 2 个。

param first: Runnable[Input, Any] [Required]

序列中的第一个 Runnable。

param last: Runnable[Any, Output] [Required]

序列中的最后一个 Runnable。

param middle: List[Runnable[Any, Any]] [Optional]

序列中的中间 Runnables。

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output][source]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现对于 IO 密集型 runnables 效果良好。

子类如果可以更有效地进行批量处理,则应覆盖此方法; 例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 该配置支持标准键,如“tags”、“metadata”(用于跟踪目的)、“max_concurrency”(用于控制并行执行的工作量)以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Returns

来自 Runnable 的输出列表。

Return type

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

并行运行列表中输入的 ainvoke,并在它们完成时生成结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 该配置支持标准键,如“tags”、“metadata”(用于跟踪目的)、“max_concurrency”(用于控制并行执行的工作量)以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。 默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Yields

输入索引和来自 Runnable 的输出的元组。

Return type

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output[source]

ainvoke 的默认实现,从线程调用 invoke。

即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。

如果子类可以异步运行,则应覆盖此方法。

参数
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Optional[Any]) –

Return type

Output

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 beta 阶段,将来可能会更改。

从 Runnable 创建一个 BaseTool。

as_tool 将从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。 如果可能,模式会从 runnable.get_input_schema 推断。 或者(例如,如果 Runnable 接受 dict 作为输入,并且未键入特定的 dict 键),则可以使用 args_schema 直接指定模式。 您也可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 该工具的模式。 默认为 None。

  • name (Optional[str]) – 该工具的名称。 默认为 None。

  • description (Optional[str]) – 该工具的描述。 默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。 默认为 None。

Returns

BaseTool 实例。

Return type

BaseTool

类型化 dict 输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本中的新增功能。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output][source]

astream 的默认实现,它调用 ainvoke。 如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。 默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

Return type

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 beta 阶段,将来可能会更改。

生成事件流。

用于创建 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个具有以下模式的字典

  • event: str - 事件名称的格式为:

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 随机生成的 ID,与给定 Runnable 执行的关联,

    Runnable 发出事件。 作为父 Runnable 执行的一部分调用的子 Runnable 会被分配自己的唯一 ID。

  • parent_ids: List[str] - 生成事件的父 runnables 的 ID。

    生成事件。 根 Runnable 将具有一个空列表。 父 ID 的顺序是从根到直接父级。 仅适用于 API 的 v2 版本。 API 的 v1 版本将返回一个空列表。

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    事件。

  • metadata: Optional[Dict[str, Any]] - Runnable 的元数据

    生成事件的元数据。

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。 为了简洁起见,元数据字段已从表格中省略。 链定义已包含在表格之后。

注意 此参考表适用于模式的 V2 版本。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[retriever name]

{“query”: “hello”}

on_retriever_end

[retriever name]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[template_name]

{“question”: “你好”}

on_prompt_end

[template_name]

{“question”: “你好”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件,用户还可以分派自定义事件(见下方示例)。

自定义事件仅在 API 的 v2 版本中公开!

自定义事件具有以下格式

属性

类型

描述

名称

str

事件的用户定义名称。

data

Any

与事件关联的数据。这可以是任何内容,但我们建议使其可序列化为 JSON。

以下是与上面显示的标准事件相关的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:分派自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 版本中弃用。在 API 稳定之前,不会分配默认值。自定义事件仅在 v2 中公开。

  • include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnable 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnable 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnable 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnable 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnable 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnable 的事件。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些将传递给 astream_log,因为 astream_events 的此实现构建于 astream_log 之上。

Yields

StreamEvents 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2

Return type

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output][source]

默认实现使用线程池执行器并行运行 invoke。

batch 的默认实现对于 IO 密集型 runnables 效果良好。

子类如果可以更有效地进行批量处理,则应覆盖此方法; 例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

Return type

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行 invoke 在输入列表上,并在结果完成时产生结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

Return type

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

配置可在运行时设置的 Runnable 的备选项。

参数
  • which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。

  • default_key (str) – 如果未选择备选项,则使用的默认键。默认为“default”。

  • prefix_keys (bool) – 是否用 ConfigurableField id 作为键的前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

Returns

配置了备选项的新 Runnable。

Return type

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的 Runnable 字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

Returns

配置了字段的新 Runnable。

Return type

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output[source]

将单个输入转换为输出。覆盖以实现。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如“tags”、“metadata”(用于跟踪目的)、“max_concurrency”(用于控制并行执行的工作量)和其他键。有关更多详细信息,请参阅 RunnableConfig。

  • kwargs (Any) –

Returns

Runnable 的输出。

Return type

Output

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output][source]

stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。 默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

Return type

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

Returns

Runnable 的 JSON 可序列化表示。

Return type

Union[SerializedConstructor, SerializedNotImplemented]

property steps: List[Runnable[Any, Any]]

按顺序构成序列的所有 Runnables。

Returns

Runnable 列表。