langchain_core.runnables.retry.RunnableRetry

注意

RunnableRetry实现了标准Runnable Interface。 🏃

Runnable接口(《Runnable Interface》)具有额外的在runnables上可用的方法,例如《with_types》 ,《with_retry》 ,《assign》 ,《bind》 ,《get_graph》 等等。

class langchain_core.runnables.retry.RunnableRetry[source]

基类: RunnableBindingBase[Input, Output]

如果Runnable失败,则重试Runnable。

RunnableRetry可以用于向任何子类Base Runnable的任一对象添加重试逻辑。

这种重试对于可能因瞬时错误而失败的网络调用特别有用。

RunnableRetry实现为RunnableBinding。最简单的方法是通过所有Runnable上的.with_retry()方法使用它。

示例:

这里有一个使用RunnableLambda引发异常的示例

import time

def foo(input) -> None:
    '''Fake function that raises an exception.'''
    raise ValueError(f"Invoking foo failed. At time {time.time()}")

runnable = RunnableLambda(foo)

runnable_with_retries = runnable.with_retry(
    retry_if_exception_type=(ValueError,), # Retry only on ValueError
    wait_exponential_jitter=True, # Add jitter to the exponential backoff
    stop_after_attempt=2, # Try twice
)

# The method invocation above is equivalent to the longer form below:

runnable_with_retries = RunnableRetry(
    bound=runnable,
    retry_exception_types=(ValueError,),
    max_attempt_number=2,
    wait_exponential_jitter=True
)

这种逻辑可以用于重试任何Runnable,包括Runnable链,但通常最好将重试的范围尽量保持得尽可能小。例如,如果您有一个Runnable链,您应该只重试可能失败的Runnable,而不是整个链。

示例:

from langchain_core.chat_models import ChatOpenAI
from langchain_core.prompts import PromptTemplate

template = PromptTemplate.from_template("tell me a joke about {topic}.")
model = ChatOpenAI(temperature=0.5)

# Good
chain = template | model.with_retry()

# Bad
chain = template | model
retryable_chain = chain.with_retry()

从Runnable和kwargs创建RunnableBinding。

参数:
  • bound – 本Runnable将委托调用到其下方的Runnable。

  • kwargs – 传递给下方Runnable的可选kwargs,当运行下方Runnable时(例如,使用invokebatchtransformstream 或异步变体)。默认为None。

  • config – 可选的可绑定到下方Runnable的配置。默认为None。

  • config_factories – 可选的配置工厂列表,在绑定到下方Runnable之前应用于配置。默认为None。

  • custom_input_type – 指定以覆盖下方Runnable的输入类型。默认为None。

  • custom_output_type – 指定以覆盖下方Runnable的输出类型。默认为None。

  • **other_kwargs – 解包到基类中。

参数 bound: Runnable[输入, 输出] [必需]

此 Runnable 转委托的基本 Runnable。

参数 config: RunnableConfig [可选]

绑定到基本 Runnable 的配置。

参数 config_factories: List[Callable[[RunnableConfig], RunnableConfig]] [可选]

绑定到基本 Runnable 的配置工厂。

参数 custom_input_type: Optional[任何] = None

使用自定义类型覆盖基本 Runnable 的输入类型。

类型可以是 pydantic 模型或类型注解(例如,List[str])。

参数 custom_output_type: Optional[任何] = None

使用自定义类型覆盖基本 Runnable 的输出类型。

类型可以是 pydantic 模型或类型注解(例如,List[str])。

参数 kwargs: Mapping[str, 任何] [可选]

在运行时传递给基本 Runnable 的 kwargs。

例如,当调用 Runnable 绑定时,基本 Runnable 将以相同的输入调用,但带有这些额外的 kwargs。

参数max_attempt_number: int= 3

重试Runnable的最大尝试次数。

参数retry_exception_types: Tuple[Type[BaseException], ...]= (<class 'Exception'>,)

在出现以下异常类型时进行重试。默认重试所有异常。

通常,你应该只重试那些可能是瞬态的异常,例如网络错误。

良好的重试异常包括所有服务器错误(5xx)和选定的客户端错误(4xx),例如429请求过多。

参数wait_exponential_jitter: bool= True

是否在指数退避中加入抖动。

asyncabatch(inputs: List[Input], config: Optional[Union[RunnableConfig,List[RunnableConfig]] = None, *, return_exceptions: bool= False, **kwargs: Any) List[Output][源代码]

默认实现使用asyncio.gather并行运行ainvoke。

默认的批量实现适用于IO密集型的Runnable。

子类应该覆盖此方法,如果它们可以更有效地批量操作;例如,如果底层的Runnable使用支持批量模式的API。

参数:
  • inputs (列表(Input)) – Runnable的输入列表。

  • config (Optional(Union(RunnableConfig, RunnableConfig列表))) – 调用Runnable时使用的配置。配置支持标准键,如‘tags’、‘metadata’用于跟踪目的,‘max_concurrency’用于控制并行执行的工作量,以及其他键。请参阅RunnableConfig获取更多详细信息。默认值为None。

  • return_exceptions (布尔值) – 是否返回异常而不是引发异常。默认值为False。

  • kwargs (任何) – 传递给Runnable的额外关键字参数。

返回

Runnable的输出列表。

返回类型

列表(Output)

asyncabatch_as_completed(inputs: Input序列, config: Optional[Union[RunnableConfig,RunnableConfig序列]] = None, *, return_exceptions: bool = False, **kwargs: Optional[任何]) asynciterator[tuple[int, Union[Output, Exception]]]

并行在输入列表上执行run方法,以完成顺序输出结果。

参数:
  • inputs (Input序列) – Runnable的输入列表。

  • config (可选[联合[RunnableConfig, Sequence[RunnableConfig]]]) – 调用Runnable时使用的配置。配置支持标准键如‘tags’、‘metadata’(用于追踪目的),‘max_concurrency’(用于控制并行执行的工作量),以及其他键。请参阅RunnableConfig以获取更多详情。默认值为None。默认值为None。

  • return_exceptions (布尔值) – 是否返回异常而不是引发异常。默认值为False。

  • kwargs (可选[任何类型]) – 传递给Runnable的额外关键字参数。

生成

Runnable输出的输入和输出索引的元组。

返回类型

AsyncIterator[Tuple[int, 联合[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output[source]

ainvoke的默认实现,从线程中调用invoke。

默认实现允许在Runnable没有实现原生异步版本的invoke时使用异步代码。

子类应在不支持异步运行时重写此方法。

参数:
  • input (输入) –

  • config (可选[RunnableConfig]) –

  • kwargs (任何类型) –

返回类型

输出

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

测试版

此API为测试版,未来可能会发生变化。

从可运行对象创建一个BaseTool。

as_tool将从一个可运行对象中使用名称、描述和args_schema实例化BaseTool。尽可能情况下,模式通过runnable.get_input_schema推断。或者(例如,如果可运行对象接受一个字典作为输入且特定字典的键没有类型),可以直接通过args_schema指定模式。您还可以通过传递arg_types只指定必需的参数及其类型。

参数:
  • args_schema (可选[Type[BaseModel]]) – 工具的模式。默认为None。

  • name (可选[str]) – 工具的名称。默认为None。

  • description (可选[str]) – 工具的描述。默认为None。

  • arg_types (可选[Dict[str, Type]]) – 参数名称到类型的字典。默认为None。

返回

BaseTool实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict输入,通过args_schema指定方案

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict输入,通过arg_types指定方案

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

自0.2.14版开始提供。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream的默认实现,调用ainvoke。子类应覆盖此方法以支持流式输出。

参数:
  • 输入 (Input) – Runnable的输入。

  • 配置 (Optional[RunnableConfig]) – 用于Runnable的配置。默认为None。

  • kwargs (可选[任何类型]) – 传递给Runnable的额外关键字参数。

生成

Runnable的输出。

返回类型

AsyncIterator[Output]

async astream_events(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

测试版

此API为测试版,未来可能会发生变化。

生成事件流。

用于创建一个迭代器,遍历提供Runnable进度的StreamEvents,包括来自中间结果的事件。

StreamEvent是一个具有以下模式的字典

  • event: str - 事件名称的格式为

    : on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的Runnable的名称。

  • run_id: str - 与给定的执行关联的随机生成的ID。

    触发事件的Runnable。作为父Runnable执行一部分调用而执行的孩子Runnable会被分配一个唯一的ID。

  • parent_ids: 字符串列表 - 生成事件的父Runnable的ID。

    根Runnable将有一个空列表。父ID的顺序是从根到直接父级。仅在API的v2版本中可用。API的v1版本将返回一个空列表。

  • tags: 可选的字符串列表 - 生成事件的Runnable的标签。

  • metadata: 可选的字符串到任意类型字典 - 生成事件的Runnable的元数据。

  • data: 字符串到任意类型字典

以下表格说明了可能由各种链产生的一些事件。出于简洁考虑,省略了元数据字段。链定义在表格之后。

注意 此参考表适用于方案的V2版本。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content="hello")

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content="hello world")

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

'Hello'

on_llm_end

[模型名称]

'Hello human!'

on_chain_start

format_docs

on_chain_stream

format_docs

"hello world!, goodbye world!"

on_chain_end

format_docs

[Document(…)]

"hello world!, goodbye world!"

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以调度自定义事件(请参见下面的示例)。

自定义事件仅在API的v2版本中可见!

自定义事件的格式如下

属性

类型

描述

名称

str

事件的用户定义名称。

data

Any

与事件关联的数据。这可以是任何内容,尽管我们建议使其可序列化为JSON。

以下是与上述标准事件相关的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例:

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:调度自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数:
  • 输入 (Input) – Runnable的输入。

  • config (可选[RunnableConfig]) – 要用于Runnable的配置。

  • version – 要使用的方案的版本,可以是v2v1。用户应使用v2v1是向后兼容的,将在0.4.0中弃用。在API稳定之前不会分配默认值。自定义事件仅在v2中可见。

  • include_names – 仅包含具有匹配名称的Runnable的事件。

  • include_types – 仅包含具有匹配类型的Runnable的事件。

  • include_tags – 仅包含具有匹配标签的Runnable的事件。

  • exclude_names – 排除具有匹配名称的Runnable的事件。

  • exclude_types – 排除具有匹配类型的Runnable的事件。

  • exclude_tags – 排除具有匹配标签的Runnable的事件。

  • kwargs (可选[任何]) –传递给Runnable的额外关键字参数。这些将被传递给astream_log,因为astream_events的实现建立在astream_log之上。

生成

StreamEvents的异步流。

抛出

NotImplementedError – 如果版本不是v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: 列表[输入], config: 可选[Union[RunnableConfig, 列表[RunnableConfig]]], *, return_exceptions: bool = False, **kwargs: 任何) 列表[输出][source]

默认实现使用线程池执行器并行运行调用。

默认的批量实现适用于IO密集型的Runnable。

子类应该覆盖此方法,如果它们可以更有效地批量操作;例如,如果底层的Runnable使用支持批量模式的API。

参数:
  • inputs (列表[输入]) –

  • config (可选[Union[RunnableConfig, 列表[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (任何类型) –

返回类型

列表(Output)

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行地对输入列表运行invoke,按完成顺序返回结果。

参数:
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *args, default_key: str = 'default', prefix_keys: bool = False, **kwargs) RunnableSerializable[Input, Output]

为可以运行时设置的Runnables配置替代方案。

参数:
  • whichConfigurableField)- 用来选择替代方案的ConfigurableField实例。

  • default_keystr)- 如果没有选择替代方案时使用的默认键。默认为“default”。

  • prefix_keysbool)- 是否在键前添加ConfigurableField id。默认为False。

  • **kwargsUnion[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]])- 键到Runnable实例或返回Runnable实例的调用的字典。

返回

一个新的具有配置替代方案的Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时设置特定的Runnable字段。

参数:

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要设置的ConfigurableField实例的字典。

返回

配置字段后的新Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output[source]

将单个输入转换为输出。覆盖以实现。

参数:
  • 输入 (Input) – Runnable的输入。

  • config (Optional[RunnableConfig]) – 在调用Runnable时使用的配置。配置支持标准键,如用于跟踪的‘tags’,‘metadata’,用于控制并行工作量的‘max_concurrency’以及其他键。请参阅RunnableConfig获取更多详细信息。

  • kwargs (任何类型) –

返回

Runnable的输出。

返回类型

输出

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream 的默认实现,调用 invoke。子类应覆盖此方法以支持流式输出。

参数:
  • 输入 (Input) – Runnable的输入。

  • 配置 (Optional[RunnableConfig]) – 用于Runnable的配置。默认为None。

  • kwargs (可选[任何类型]) – 传递给Runnable的额外关键字参数。

生成

Runnable的输出。

返回类型

Output 类型的迭代器

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回

Runnable 的 JSON 序列表示。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]