langchain_core.runnables.base.RunnableLambda

注意

RunnableLambda 实现了标准的 Runnable 接口。 🏃

Runnable 接口 具有在 runnables 上可用的其他方法,例如 with_typeswith_retryassignbindget_graph 等。

class langchain_core.runnables.base.RunnableLambda(func: Union[Union[Callable[[Input], Output], Callable[[Input], Iterator[Output]], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]], afunc: Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]] = None, name: Optional[str] = None)[source]

RunnableLambda 将 Python 可调用对象转换为 Runnable。

将可调用对象包装在 RunnableLambda 中使得该可调用对象可以在同步或异步上下文中使用。

RunnableLambda 可以像任何其他 Runnable 一样组合,并提供与 LangChain 追踪的无缝集成。

RunnableLambda 最适合不需要支持流式处理的代码。如果需要支持流式处理(即,能够处理输入块并产生输出块),请改用 RunnableGenerator

请注意,如果 RunnableLambda 返回 Runnable 的实例,则该实例在执行期间被调用(或流式传输)。

示例

# This is a RunnableLambda
from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)

runnable.invoke(1) # returns 2
runnable.batch([1, 2, 3]) # returns [2, 3, 4]

# Async is supported by default by delegating to the sync implementation
await runnable.ainvoke(1) # returns 2
await runnable.abatch([1, 2, 3]) # returns [2, 3, 4]


# Alternatively, can provide both synd and sync implementations
async def add_one_async(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one, afunc=add_one_async)
runnable.invoke(1) # Uses add_one
await runnable.ainvoke(1) # Uses add_one_async

从可调用对象、异步可调用对象或两者创建一个 RunnableLambda。

接受同步和异步变体,以便为同步和异步执行提供高效的实现。

参数
Raises
  • TypeError – 如果 func 不是可调用类型。

  • TypeError – 如果同时提供了 func 和 afunc。

属性

InputType

此 Runnable 的输入类型。

OutputType

此 Runnable 的输出类型,作为类型注解。

config_specs

列出此 Runnable 的可配置字段。

deps

此 Runnable 的依赖项。

input_schema

此 Runnable 接受的输入类型,指定为 pydantic 模型。

name

Runnable 的名称。

output_schema

此 Runnable 生成的输出类型,指定为 pydantic 模型。

方法

__init__(func[, afunc, name])

从可调用对象、异步可调用对象或两者创建一个 RunnableLambda。

abatch(inputs[, config, return_exceptions])

默认实现使用 asyncio.gather 并行运行 ainvoke。

abatch_as_completed(inputs[, config, ...])

并行运行输入列表上的 ainvoke,并在结果完成时产生结果。

ainvoke(input[, config])

异步调用此 Runnable。

as_tool([args_schema, name, description, ...])

assign(**kwargs)

将新字段分配给此 Runnable 的字典输出。

astream(input[, config])

astream 的默认实现,它调用 ainvoke。

astream_events(input[, config, ...])

astream_log(input[, config, diff, ...])

流式传输来自 Runnable 的所有输出,如回调系统报告的那样。

atransform(input[, config])

atransform 的默认实现,它缓冲输入并调用 astream。

batch(inputs[, config, return_exceptions])

默认实现使用线程池执行器并行运行 invoke。

batch_as_completed(inputs[, config, ...])

并行运行输入列表上的 invoke,并在结果完成时产生结果。

bind(**kwargs)

将参数绑定到 Runnable,返回一个新的 Runnable。

config_schema(*[, include])

此 Runnable 接受的配置类型,指定为 pydantic 模型。

get_graph([config])

返回此 Runnable 的图形表示。

get_input_schema([config])

此 Runnable 输入的 pydantic 模式。

get_name([suffix, name])

获取 Runnable 的名称。

get_output_schema([config])

获取可用于验证 Runnable 输出的 pydantic 模型。

get_prompts([config])

返回此 Runnable 使用的提示列表。

invoke(input[, config])

同步调用此 Runnable。

map()

返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。

pick(keys)

从此 Runnable 的字典输出中选取键。

pipe(*others[, name])

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

stream(input[, config])

stream 的默认实现,它调用 invoke。

transform(input[, config])

transform 的默认实现,它缓冲输入,然后调用 stream。

with_alisteners(*[, on_start, on_end, on_error])

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_config([config])

将配置绑定到 Runnable,返回一个新的 Runnable。

with_fallbacks(fallbacks, *[, ...])

向 Runnable 添加回退,返回一个新的 Runnable。

with_listeners(*[, on_start, on_end, on_error])

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_retry(*[, retry_if_exception_type, ...])

创建一个新的 Runnable,该 Runnable 在异常时重试原始 Runnable。

with_types(*[, input_type, output_type])

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

__init__(func: Union[Union[Callable[[Input], Output], Callable[[Input], Iterator[Output]], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]], afunc: Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]] = None, name: Optional[str] = None) None[source]

从可调用对象、异步可调用对象或两者创建一个 RunnableLambda。

接受同步和异步变体,以便为同步和异步执行提供高效的实现。

参数
Raises
  • TypeError – 如果 func 不是可调用类型。

  • TypeError – 如果同时提供了 func 和 afunc。

返回类型

None

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

批处理的默认实现对于 IO 绑定的 Runnable 效果良好。

如果子类可以更有效地进行批处理,则应重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

返回

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

并行运行输入列表上的 ainvoke,并在结果完成时产生结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

包含输入索引和 Runnable 输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output[source]

异步调用此 Runnable。

参数
  • input (Input) – 此 Runnable 的输入。

  • config (Optional[RunnableConfig]) – 要使用的配置。默认为 None。

  • kwargs (Optional[Any]) – 附加关键字参数。

返回

此 Runnable 的输出。

返回类型

Output

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,将来可能会发生变化。

从 Runnable 创建一个 BaseTool。

as_tool 将从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。如果可能,架构将从 runnable.get_input_schema 推断。或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定的字典键),可以使用 args_schema 直接指定架构。您还可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的架构。默认为 None。

  • name (Optional[str]) – 工具的名称。默认为 None。

  • description (Optional[str]) – 工具的描述。默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。

返回

BaseTool 实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定架构

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定架构

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本中的新功能。

assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any]

将新字段分配给此 Runnable 的字典输出。返回一个新的 Runnable。

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)

print(chain_with_assign.input_schema.schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.schema()) #
{'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
参数

kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –

返回类型

RunnableSerializable[Any, Any]

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,将来可能会发生变化。

生成事件流。

用于创建 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个具有以下架构的字典

  • event: str - 事件名称的格式为:

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与给定执行关联的随机生成的 ID

    发射事件的 Runnable。作为父级 Runnable 执行一部分而被调用的子级 Runnable 会被分配其自己唯一的 ID。

  • parent_ids: List[str] - 生成事件的父级 runnable 的 ID。

    根 Runnable 将拥有一个空列表。父级 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    事件。

  • metadata: Optional[Dict[str, Any]] - Runnable 的元数据

    生成事件的 Runnable 的元数据。

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链定义已包含在表格之后。

注意 此参考表适用于模式的 V2 版本。

事件

name

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件之外,用户还可以调度自定义事件(请参见下面的示例)。

自定义事件将仅在 v2 版本的 API 中显示!

自定义事件具有以下格式

属性

类型

描述

name

str

用户定义的事件名称。

数据

Any

与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。

以下是与上面显示的标准事件关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:调度自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnable 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnable 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnable 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnable 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnable 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnable 的事件。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些将传递给 astream_log,因为 astream_events 的此实现是基于 astream_log 构建的。

产生

StreamEvents 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

从 Runnable 流式传输所有输出,如向回调系统报告的那样。这包括 LLM、检索器、工具等的所有内部运行。

输出作为 Log 对象流式传输,其中包括 Jsonpatch 操作列表,这些操作描述了运行状态在每个步骤中是如何变化的,以及运行的最终状态。

Jsonpatch 操作可以按顺序应用以构造状态。

参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • diff (bool) – 是否产生每个步骤之间的差异或当前状态。

  • with_streamed_output_list (bool) – 是否产生 streamed_output 列表。

  • include_names (Optional[Sequence[str]]) – 仅包括具有这些名称的日志。

  • include_types (Optional[Sequence[str]]) – 仅包括具有这些类型的日志。

  • include_tags (Optional[Sequence[str]]) – 仅包括具有这些标签的日志。

  • exclude_names (Optional[Sequence[str]]) – 排除具有这些名称的日志。

  • exclude_types (Optional[Sequence[str]]) – 排除具有这些类型的日志。

  • exclude_tags (Optional[Sequence[str]]) – 排除具有这些标签的日志。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。

产生

RunLogPatch 或 RunLog 对象。

返回类型

Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output][source]

atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在输入仍在生成时开始生成输出,则应覆盖此方法。

参数
  • input (AsyncIterator[Input]) – Runnable 的输入异步迭代器。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

批处理的默认实现对于 IO 绑定的 Runnable 效果良好。

如果子类可以更有效地进行批处理,则应重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行输入列表上的 invoke,并在结果完成时产生结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

bind(**kwargs: Any) Runnable[Input, Output]

将参数绑定到 Runnable,返回一个新的 Runnable。

当链中的 Runnable 需要一个参数,而该参数不在前一个 Runnable 的输出中或未包含在用户输入中时,此方法很有用。

参数

kwargs (Any) – 要绑定到 Runnable 的参数。

返回

具有绑定参数的新 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# Without bind.
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind.
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel]

此 Runnable 接受的配置类型,指定为 pydantic 模型。

要将字段标记为可配置,请参阅 configurable_fieldsconfigurable_alternatives 方法。

参数

include (Optional[Sequence[str]]) – 要包含在配置架构中的字段列表。

返回

可用于验证配置的 pydantic 模型。

返回类型

Type[BaseModel]

get_graph(config: Optional[RunnableConfig] = None) Graph[source]

返回此 Runnable 的图形表示。

参数

config (Optional[RunnableConfig]) –

返回类型

Graph

get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel][source]

此 Runnable 输入的 pydantic 模式。

参数

config (Optional[RunnableConfig]) – 要使用的配置。默认为 None。

返回

此 Runnable 的输入架构。

返回类型

Type[BaseModel]

get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str

获取 Runnable 的名称。

参数
  • suffix (Optional[str]) –

  • name (Optional[str]) –

返回类型

str

get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel]

获取可用于验证 Runnable 输出的 pydantic 模型。

利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输出架构,该架构取决于调用 Runnable 时使用的配置。

此方法允许获取特定配置的输出架构。

参数

config (Optional[RunnableConfig]) – 生成架构时要使用的配置。

返回

可用于验证输出的 pydantic 模型。

返回类型

Type[BaseModel]

get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate]

返回此 Runnable 使用的提示列表。

参数

config (Optional[RunnableConfig]) –

返回类型

List[BasePromptTemplate]

invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output[source]

同步调用此 Runnable。

参数
  • input (Input) – 此 Runnable 的输入。

  • config (Optional[RunnableConfig]) – 要使用的配置。默认为 None。

  • kwargs (Optional[Any]) – 附加关键字参数。

返回

此 Runnable 的输出。

Raises

TypeError – 如果 Runnable 是协程函数。

返回类型

Output

map() Runnable[List[Input], List[Output]]

返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。

返回

一个新的 Runnable,它将输入列表映射到输出列表。

返回类型

Runnable[List[Input], List[Output]]

示例

from langchain_core.runnables import RunnableLambda

def _lambda(x: int) -> int:
    return x + 1

runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any]

从此 Runnable 的字典输出中选取键。

选择单个键
import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")

chain = RunnableMap(
    str=as_str,
    json=as_json,
    bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
参数

keys (Union[str, List[str]]) –

返回类型

RunnableSerializable[Any, Any]

pipe(**others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other]

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

等效于 RunnableSequence(self, *others)self | others[0] | …

示例

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
参数
  • others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –

  • name (Optional[str]) –

返回类型

RunnableSerializable[Input, Other]

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output][source]

stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应该重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output][source]

transform 的默认实现,它会缓冲输入,然后调用 stream。如果子类可以在输入仍在生成时开始生成输出,则应该重写此方法。

参数
  • input (Iterator[Input]) – Runnable 的输入迭代器。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

with_alisteners(*, on_start: Optional[AsyncListener] = None, on_end: Optional[AsyncListener] = None, on_error: Optional[AsyncListener] = None) Runnable[Input, Output]

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start:在 Runnable 开始运行之前异步调用。 on_end:在 Runnable 运行结束后异步调用。 on_error:如果 Runnable 抛出错误,则异步调用。

Run 对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间和添加到运行的任何标签或元数据。

参数
  • on_start (Optional[AsyncListener]) – 在 Runnable 开始运行之前异步调用。默认为 None。

  • on_end (Optional[AsyncListener]) – 在 Runnable 运行结束后异步调用。默认为 None。

  • on_error (Optional[AsyncListener]) – 如果 Runnable 抛出错误,则异步调用。默认为 None。

返回

绑定了监听器的新 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda
import time

async def test_runnable(time_to_sleep : int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj : Runnable):
    print(f"on start callback starts at {format_t(time.time())}
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj : Runnable):
    print(f"on end callback starts at {format_t(time.time())}
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2024-05-16T14:20:29.637053+00:00
on start callback starts at 2024-05-16T14:20:29.637150+00:00
on start callback ends at 2024-05-16T14:20:32.638305+00:00
on start callback ends at 2024-05-16T14:20:32.638383+00:00
Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00
Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00
Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00
on end callback starts at 2024-05-16T14:20:35.640534+00:00
Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00
on end callback starts at 2024-05-16T14:20:37.640574+00:00
on end callback ends at 2024-05-16T14:20:37.640654+00:00
on end callback ends at 2024-05-16T14:20:39.641751+00:00
with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output]

将配置绑定到 Runnable,返回一个新的 Runnable。

参数
  • config (Optional[RunnableConfig]) – 要绑定到 Runnable 的配置。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。

返回

绑定了配置的新 Runnable。

返回类型

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output]

向 Runnable 添加回退,返回一个新的 Runnable。

新的 Runnable 将在失败时尝试原始 Runnable,然后按顺序尝试每个回退方案。

参数
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的回退方案序列。

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。默认为 (Exception,)。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退方案,键为指定的键。如果为 None,则异常将不会传递给回退方案。如果使用,则基本 Runnable 及其回退方案必须接受字典作为输入。默认为 None。

返回

一个新的 Runnable,它将在失败时尝试原始 Runnable,然后按顺序尝试每个回退方案。

返回类型

RunnableWithFallbacksT[Input, Output]

示例

from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
    )
print(''.join(runnable.stream({}))) #foo bar
参数
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的回退方案序列。

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退方案,键为指定的键。如果为 None,则异常将不会传递给回退方案。如果使用,则基本 Runnable 及其回退方案必须接受字典作为输入。

返回

一个新的 Runnable,它将在失败时尝试原始 Runnable,然后按顺序尝试每个回退方案。

返回类型

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output]

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start:在 Runnable 开始运行之前调用,带有 Run 对象。 on_end:在 Runnable 运行结束后调用,带有 Run 对象。 on_error:如果 Runnable 抛出错误,则调用,带有 Run 对象。

Run 对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间和添加到运行的任何标签或元数据。

参数
  • on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 开始运行之前调用。默认为 None。

  • on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 运行结束后调用。默认为 None。

  • on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 如果 Runnable 抛出错误,则调用。默认为 None。

返回

绑定了监听器的新 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)
with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output]

创建一个新的 Runnable,该 Runnable 在异常时重试原始 Runnable。

参数
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) – 发生这些异常类型时进行重试。默认为 (Exception,)。

  • wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。

  • stop_after_attempt (int) – 放弃之前的最大尝试次数。默认为 3。

返回

一个新的 Runnable,它会在发生异常时重试原始 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)
参数
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) – 发生这些异常类型时进行重试

  • wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动

  • stop_after_attempt (int) – 放弃之前的最大尝试次数

返回

一个新的 Runnable,它会在发生异常时重试原始 Runnable。

返回类型

Runnable[Input, Output]

with_types(*, input_type: Optional[Type] = None, output_type: Optional[Type] = None) Runnable[Input, Output]

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

参数
  • input_type (Optional[Type[Input]]) – 要绑定到 Runnable 的输入类型。默认为 None。

  • output_type (Optional[Type[Output]]) – 要绑定到 Runnable 的输出类型。默认为 None。

返回

绑定了类型的新 Runnable。

返回类型

Runnable[Input, Output]

RunnableLambda 的使用示例