langchain_core.runnables.base.RunnableGenerator

注意

RunnableGenerator 实现了标准的 Runnable 接口。🏃

Runnable 接口 具有在 runnables 上可用的附加方法,例如 with_typeswith_retryassignbindget_graph 等。

class langchain_core.runnables.base.RunnableGenerator(transform: Union[Callable[[Iterator[Input]], Iterator[Output]], Callable[[AsyncIterator[Input]], AsyncIterator[Output]]], atransform: Optional[Callable[[AsyncIterator[Input]], AsyncIterator[Output]]] = None)[source]

运行生成器函数的 Runnable。

RunnableGenerators 可以直接实例化,也可以在序列中使用生成器来实例化。

RunnableGenerators 可用于实现自定义行为,例如自定义输出解析器,同时保持流式处理能力。给定一个签名 Iterator[A] -> Iterator[B] 的生成器函数,将其包装在 RunnableGenerator 中,使其能够在从上一步流式传输输入后立即发出输出块。

请注意,如果生成器函数的签名是 A -> Iterator[B],以至于它需要在发出块之前完成来自上一步的输入(例如,大多数 LLM 需要完整的提示才能开始生成),则可以将其包装在 RunnableLambda 中。

这是一个示例,展示 RunnableGenerator 的基本机制

from typing import Any, AsyncIterator, Iterator

from langchain_core.runnables import RunnableGenerator


def gen(input: Iterator[Any]) -> Iterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token


runnable = RunnableGenerator(gen)
runnable.invoke(None)  # "Have a nice day"
list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]


# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token

runnable = RunnableGenerator(agen)
await runnable.ainvoke(None)  # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]

RunnableGenerator 使在流式处理上下文中实现自定义行为变得容易。下面我们展示一个例子

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser


model = ChatOpenAI()
chant_chain = (
    ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
    | model
    | StrOutputParser()
)

def character_generator(input: Iterator[str]) -> Iterator[str]:
    for token in input:
        if "," in token or "." in token:
            yield "👏" + token
        else:
            yield token


runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏.

# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
    # Yield characters of input in reverse order.
    for character in input[::-1]:
        yield character

runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"}))  # ".elcycer ,esuer ,ecudeR"

初始化 RunnableGenerator。

参数
  • transform (Union[Callable[[Iterator[Input]], Iterator[Output]], Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) – 转换函数。

  • atransform (Optional[Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) – 异步转换函数。默认为 None。

引发

TypeError – 如果 transform 不是生成器函数。

属性

InputType

此 Runnable 接受的输入类型,指定为类型注解。

OutputType

此 Runnable 生成的输出类型,指定为类型注解。

config_specs

列出此 Runnable 的可配置字段。

input_schema

此 Runnable 接受的输入类型,指定为 pydantic 模型。

名称

Runnable 的名称。

output_schema

此 Runnable 生成的输出类型,指定为 pydantic 模型。

方法

__init__(transform[, atransform])

初始化 RunnableGenerator。

abatch(inputs[, config, return_exceptions])

默认实现使用 asyncio.gather 并行运行 ainvoke。

abatch_as_completed(inputs[, config, ...])

在一系列输入上并行运行 ainvoke,并在完成时生成结果。

ainvoke(input[, config])

ainvoke 的默认实现,从线程调用 invoke。

as_tool([args_schema, name, description, ...])

assign(**kwargs)

为此 Runnable 的字典输出分配新字段。

astream(input[, config])

astream 的默认实现,它调用 ainvoke。

astream_events(input[, config, ...])

astream_log(input[, config, diff, ...])

流式传输来自 Runnable 的所有输出,如回调系统报告的那样。

atransform(input[, config])

atransform 的默认实现,它缓冲输入并调用 astream。

batch(inputs[, config, return_exceptions])

默认实现使用线程池执行器并行运行 invoke。

batch_as_completed(inputs[, config, ...])

在一系列输入上并行运行 invoke,并在完成时生成结果。

bind(**kwargs)

将参数绑定到 Runnable,返回一个新的 Runnable。

config_schema(*[, include])

此 Runnable 接受的配置类型,指定为 pydantic 模型。

get_graph([config])

返回此 Runnable 的图形表示。

get_input_schema([config])

获取可用于验证 Runnable 输入的 pydantic 模型。

get_name([suffix, name])

获取 Runnable 的名称。

get_output_schema([config])

获取可用于验证 Runnable 输出的 pydantic 模型。

get_prompts([config])

返回此 Runnable 使用的提示列表。

invoke(input[, config])

将单个输入转换为输出。

map()

返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。

pick(keys)

从此 Runnable 的字典输出中选取键。

pipe(*others[, name])

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

stream(input[, config])

stream 的默认实现,它调用 invoke。

transform(input[, config])

transform 的默认实现,它缓冲输入,然后调用 stream。

with_alisteners(*[, on_start, on_end, on_error])

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_config([config])

将配置绑定到 Runnable,返回一个新的 Runnable。

with_fallbacks(fallbacks, *[, ...])

向 Runnable 添加回退,返回一个新的 Runnable。

with_listeners(*[, on_start, on_end, on_error])

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_retry(*[, retry_if_exception_type, ...])

创建一个新的 Runnable,它在发生异常时重试原始 Runnable。

with_types(*[, input_type, output_type])

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

__init__(transform: Union[Callable[[Iterator[Input]], Iterator[Output]], Callable[[AsyncIterator[Input]], AsyncIterator[Output]]], atransform: Optional[Callable[[AsyncIterator[Input]], AsyncIterator[Output]]] = None) None[source]

初始化 RunnableGenerator。

参数
  • transform (Union[Callable[[Iterator[Input]], Iterator[Output]], Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) – 转换函数。

  • atransform (Optional[Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) – 异步转换函数。默认为 None。

引发

TypeError – 如果 transform 不是生成器函数。

返回类型

None

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

批处理的默认实现对于 IO 绑定 runnables 效果良好。

如果子类可以更有效地进行批处理,则应重写此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

返回

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在一系列输入上并行运行 ainvoke,并在完成时生成结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

一个元组,包含输入的索引和来自 Runnable 的输出。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output[source]

ainvoke 的默认实现,从线程调用 invoke。

即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。

如果子类可以异步运行,则应重写此方法。

参数
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Any) –

返回类型

输出

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,未来可能会发生变化。

从 Runnable 创建 BaseTool。

as_tool 将从 Runnable 实例化一个带有名称、描述和 args_schema 的 BaseTool。如果可能,模式将从 runnable.get_input_schema 推断。或者(例如,如果 Runnable 接受 dict 作为输入,并且未对特定的 dict 键进行类型化),则可以使用 args_schema 直接指定模式。您也可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。

  • name (Optional[str]) – 工具的名称。默认为 None。

  • description (Optional[str]) – 工具的描述。默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。

返回

一个 BaseTool 实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本新增。

assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable]]]) RunnableSerializable[Any, Any]

将新字段分配给此 Runnable 的字典输出。返回一个新的 Runnable。

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)

print(chain_with_assign.input_schema.schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.schema()) #
{'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
参数

kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –

返回类型

RunnableSerializable[Any, Any]

astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) AsyncIterator[Output][source]

astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,未来可能会发生变化。

生成事件流。

用于创建 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个具有以下模式的字典

  • event: str - 事件名称的格式为:

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 随机生成的 ID,与给定 Runnable 执行的关联,

    该 Runnable 发出事件。作为父 Runnable 执行一部分而调用的子 Runnable 将被分配其自己的唯一 ID。

  • parent_ids: List[str] - 生成事件的父 runnable 的 ID。

    根 Runnable 将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    事件。

  • metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。

    生成事件的 Runnable 的元数据。

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,元数据字段已从表格中省略。链定义已包含在表格之后。

注意 此参考表适用于模式的 V2 版本。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以调度自定义事件(请参阅下面的示例)。

自定义事件将仅在 API 的 v2 版本中显示!

自定义事件具有以下格式

属性

类型

描述

名称

str

用户定义的事件名称。

data

Any

与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。

以下是与上面显示的标准事件关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:调度自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnable 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnable 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnable 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnable 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnable 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnable 的事件。

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为 astream_events 的此实现是基于 astream_log 构建的。

产生

StreamEvents 的异步流。

引发

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

流式传输来自 Runnable 的所有输出,如回调系统报告的那样。这包括 LLM、检索器、工具等的所有内部运行。

输出作为 Log 对象进行流式传输,其中包括 Jsonpatch 操作列表,这些操作描述了运行状态在每个步骤中的变化方式,以及运行的最终状态。

可以应用 Jsonpatch 操作以构造状态。

参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • diff (bool) – 是否生成每个步骤之间的差异或当前状态。

  • with_streamed_output_list (bool) – 是否生成 streamed_output 列表。

  • include_names (Optional[Sequence[str]]) – 仅包括具有这些名称的日志。

  • include_types (Optional[Sequence[str]]) – 仅包括具有这些类型的日志。

  • include_tags (Optional[Sequence[str]]) – 仅包括具有这些标签的日志。

  • exclude_names (Optional[Sequence[str]]) – 排除具有这些名称的日志。

  • exclude_types (Optional[Sequence[str]]) – 排除具有这些类型的日志。

  • exclude_tags (Optional[Sequence[str]]) – 排除具有这些标签的日志。

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。

产生

一个 RunLogPatch 或 RunLog 对象。

返回类型

Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Any) AsyncIterator[Output][source]

atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在输入仍在生成时开始生成输出,则应覆盖此方法。

参数
  • input (AsyncIterator[Input]) – Runnable 的异步输入迭代器。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, **, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

批处理的默认实现对于 IO 绑定 runnables 效果良好。

如果子类可以更有效地进行批处理,则应重写此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

在一系列输入上并行运行 invoke,并在完成时生成结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

bind(**kwargs: Any) Runnable[Input, Output]

将参数绑定到 Runnable,返回一个新的 Runnable。

Useful when a Runnable in a chain requires an argument that is not in the output of the previous Runnable or included in the user input.

参数

kwargs (Any) – The arguments to bind to the Runnable.

返回

A new Runnable with the arguments bound.

返回类型

Runnable[Input, Output]

示例

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# Without bind.
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind.
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel]

此 Runnable 接受的配置类型,指定为 pydantic 模型。

To mark a field as configurable, see the configurable_fields and configurable_alternatives methods.

参数

include (Optional[Sequence[str]]) – A list of fields to include in the config schema.

返回

A pydantic model that can be used to validate config.

返回类型

Type[BaseModel]

get_graph(config: Optional[RunnableConfig] = None) Graph

返回此 Runnable 的图形表示。

参数

config (Optional[RunnableConfig]) –

返回类型

Graph

get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel]

获取可用于验证 Runnable 输入的 pydantic 模型。

Runnables that leverage the configurable_fields and configurable_alternatives methods will have a dynamic input schema that depends on which configuration the Runnable is invoked with.

This method allows to get an input schema for a specific configuration.

参数

config (Optional[RunnableConfig]) – A config to use when generating the schema.

返回

A pydantic model that can be used to validate input.

返回类型

Type[BaseModel]

get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str

获取 Runnable 的名称。

参数
  • suffix (Optional[str]) –

  • name (Optional[str]) –

返回类型

str

get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel]

获取可用于验证 Runnable 输出的 pydantic 模型。

Runnables that leverage the configurable_fields and configurable_alternatives methods will have a dynamic output schema that depends on which configuration the Runnable is invoked with.

This method allows to get an output schema for a specific configuration.

参数

config (Optional[RunnableConfig]) – A config to use when generating the schema.

返回

A pydantic model that can be used to validate output.

返回类型

Type[BaseModel]

get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate]

返回此 Runnable 使用的提示列表。

参数

config (Optional[RunnableConfig]) –

返回类型

List[BasePromptTemplate]

invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output[source]

Transform a single input into an output. Override to implement.

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – A config to use when invoking the Runnable. The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details.

  • kwargs (Any) –

返回

Runnable 的输出。

返回类型

输出

map() Runnable[List[Input], List[Output]]

返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。

返回

A new Runnable that maps a list of inputs to a list of outputs.

返回类型

Runnable[List[Input], List[Output]]

示例

from langchain_core.runnables import RunnableLambda

def _lambda(x: int) -> int:
    return x + 1

runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any]

从此 Runnable 的字典输出中选取键。

Pick single key
import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
Pick list of keys
from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")

chain = RunnableMap(
    str=as_str,
    json=as_json,
    bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
参数

keys (Union[str, List[str]]) –

返回类型

RunnableSerializable[Any, Any]

pipe(*others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other]

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

Equivalent to RunnableSequence(self, *others) or self | others[0] | …

示例

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
参数
  • others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –

  • name (Optional[str]) –

返回类型

RunnableSerializable[Input, Other]

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Iterator[Output][source]

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Any) Iterator[Output][source]

Default implementation of transform, which buffers input and then calls stream. Subclasses should override this method if they can start producing output while input is still being generated.

参数
  • input (Iterator[Input]) – An iterator of inputs to the Runnable.

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

with_alisteners(*, on_start: Optional[AsyncListener] = None, on_end: Optional[AsyncListener] = None, on_error: Optional[AsyncListener] = None) Runnable[Input, Output]

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start: Asynchronously called before the Runnable starts running. on_end: Asynchronously called after the Runnable finishes running. on_error: Asynchronously called if the Runnable throws an error.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

参数
  • on_start (Optional[AsyncListener]) – Asynchronously called before the Runnable starts running. Defaults to None.

  • on_end (Optional[AsyncListener]) – Asynchronously called after the Runnable finishes running. Defaults to None.

  • on_error (Optional[AsyncListener]) – Asynchronously called if the Runnable throws an error. Defaults to None.

返回

A new Runnable with the listeners bound.

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda
import time

async def test_runnable(time_to_sleep : int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj : Runnable):
    print(f"on start callback starts at {format_t(time.time())}
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj : Runnable):
    print(f"on end callback starts at {format_t(time.time())}
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2024-05-16T14:20:29.637053+00:00
on start callback starts at 2024-05-16T14:20:29.637150+00:00
on start callback ends at 2024-05-16T14:20:32.638305+00:00
on start callback ends at 2024-05-16T14:20:32.638383+00:00
Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00
Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00
Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00
on end callback starts at 2024-05-16T14:20:35.640534+00:00
Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00
on end callback starts at 2024-05-16T14:20:37.640574+00:00
on end callback ends at 2024-05-16T14:20:37.640654+00:00
on end callback ends at 2024-05-16T14:20:39.641751+00:00
with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output]

将配置绑定到 Runnable,返回一个新的 Runnable。

参数
  • config (Optional[RunnableConfig]) – The config to bind to the Runnable.

  • kwargs (Any) – 传递给 Runnable 的其他关键字参数。

返回

A new Runnable with the config bound.

返回类型

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output]

向 Runnable 添加回退,返回一个新的 Runnable。

The new Runnable will try the original Runnable, and then each fallback in order, upon failures.

参数
  • fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original Runnable fails.

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) – A tuple of exception types to handle. Defaults to (Exception,).

  • exception_key (Optional[str]) – If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input. Defaults to None.

返回

A new Runnable that will try the original Runnable, and then each fallback in order, upon failures.

返回类型

RunnableWithFallbacksT[Input, Output]

示例

from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
    )
print(''.join(runnable.stream({}))) #foo bar
参数
  • fallbacks (Sequence[Runnable[Input, Output]]) – A sequence of runnables to try if the original Runnable fails.

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) – A tuple of exception types to handle.

  • exception_key (Optional[str]) – If string is specified then handled exceptions will be passed to fallbacks as part of the input under the specified key. If None, exceptions will not be passed to fallbacks. If used, the base Runnable and its fallbacks must accept a dictionary as input.

返回

A new Runnable that will try the original Runnable, and then each fallback in order, upon failures.

返回类型

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output]

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start: Called before the Runnable starts running, with the Run object. on_end: Called after the Runnable finishes running, with the Run object. on_error: Called if the Runnable throws an error, with the Run object.

The Run object contains information about the run, including its id, type, input, output, error, start_time, end_time, and any tags or metadata added to the run.

参数
  • on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called before the Runnable starts running. Defaults to None.

  • on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called after the Runnable finishes running. Defaults to None.

  • on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – Called if the Runnable throws an error. Defaults to None.

返回

A new Runnable with the listeners bound.

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)
with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output]

创建一个新的 Runnable,它在发生异常时重试原始 Runnable。

参数
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) – A tuple of exception types to retry on. Defaults to (Exception,).

  • wait_exponential_jitter (bool) – Whether to add jitter to the wait time between retries. Defaults to True.

  • stop_after_attempt (int) – The maximum number of attempts to make before giving up. Defaults to 3.

返回

A new Runnable that retries the original Runnable on exceptions.

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)
参数
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) – A tuple of exception types to retry on

  • wait_exponential_jitter (bool) – Whether to add jitter to the wait time between retries

  • stop_after_attempt (int) – The maximum number of attempts to make before giving up

返回

A new Runnable that retries the original Runnable on exceptions.

返回类型

Runnable[Input, Output]

with_types(*, input_type: Optional[Type[Input]] = None, output_type: Optional[Type[Output]] = None) Runnable[Input, Output]

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

参数
  • input_type (Optional[Type[Input]]) – 绑定到 Runnable 的输入类型。默认为 None。

  • output_type (Optional[Type[Output]]) – 绑定到 Runnable 的输出类型。默认为 None。

返回

一个绑定了类型的新 Runnable 对象。

返回类型

Runnable[Input, Output]

使用 RunnableGenerator 的示例