langchain_core.runnables.base
.Runnable¶
- class langchain_core.runnables.base.Runnable[source]¶
可以被调用、批量处理、流式处理、转换和组合的工作单元。
invoke/ainvoke: 将单个输入转换为输出。
batch/abatch: 高效地将多个输入转换为输出。
stream/astream: 从单个输入流式传输输出,并在生成时输出。
astream_log: 从输入流式传输输出和选定的中间结果。
内置优化
Batch: 默认情况下,批量运行使用线程池执行器并行调用 invoke()。覆盖此方法以优化批处理。
Async: 带有“a”后缀的方法是异步的。默认情况下,它们使用 asyncio 的线程池执行同步对应项。覆盖此方法以实现原生异步。
所有方法都接受可选的 config 参数,该参数可用于配置执行、添加标签和元数据以进行跟踪和调试等。
Runnables 通过 input_schema 属性、output_schema 属性和 config_schema 方法公开有关其输入、输出和配置的示意信息。
LCEL 和组合¶
LangChain 表达式语言 (LCEL) 是一种声明式方式,用于将 Runnables 组合成链。以这种方式构建的任何链都将自动具有同步、异步、批量和流式处理支持。
主要的组合原语是 RunnableSequence 和 RunnableParallel。
RunnableSequence 顺序调用一系列 runnables,其中一个 Runnable 的输出作为下一个的输入。使用 | 运算符或将 runnables 列表传递给 RunnableSequence 来构造。
RunnableParallel 并发调用 runnables,为每个 runnable 提供相同的输入。使用序列中的字典字面量或将字典传递给 RunnableParallel 来构造它。
例如,
from langchain_core.runnables import RunnableLambda # A RunnableSequence constructed using the `|` operator sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2) sequence.invoke(1) # 4 sequence.batch([1, 2, 3]) # [4, 6, 8] # A sequence that contains a RunnableParallel constructed using a dict literal sequence = RunnableLambda(lambda x: x + 1) | { 'mul_2': RunnableLambda(lambda x: x * 2), 'mul_5': RunnableLambda(lambda x: x * 5) } sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
标准方法¶
所有 Runnables 都公开了可用于修改其行为的其他方法(例如,添加重试策略、添加生命周期监听器、使其可配置等)。
这些方法适用于任何 Runnable,包括通过组合其他 Runnables 构建的 Runnable 链。有关详细信息,请参阅各个方法。
例如,
from langchain_core.runnables import RunnableLambda import random def add_one(x: int) -> int: return x + 1 def buggy_double(y: int) -> int: '''Buggy code that will fail 70% of the time''' if random.random() > 0.3: print('This code failed, and will probably be retried!') # noqa: T201 raise ValueError('Triggered buggy code') return y * 2 sequence = ( RunnableLambda(add_one) | RunnableLambda(buggy_double).with_retry( # Retry on failure stop_after_attempt=10, wait_exponential_jitter=False ) ) print(sequence.input_schema.schema()) # Show inferred input schema print(sequence.output_schema.schema()) # Show inferred output schema print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)
调试和追踪¶
随着链变长,能够查看中间结果来调试和追踪链会很有用。
您可以将全局调试标志设置为 True,以启用所有链的调试输出
from langchain_core.globals import set_debug set_debug(True)
或者,您可以将现有或自定义的回调传递给任何给定的链
from langchain_core.tracers import ConsoleCallbackHandler chain.invoke( ..., config={'callbacks': [ConsoleCallbackHandler()]} )
对于 UI(以及更多功能),请查看 LangSmith: https://langsmith.langchain.ac.cn/
属性
InputType
此 Runnable 接受的输入类型,指定为类型注解。
OutputType
此 Runnable 生成的输出类型,指定为类型注解。
config_specs
列出此 Runnable 的可配置字段。
input_schema
此 Runnable 接受的输入类型,指定为 pydantic 模型。
name
Runnable 的名称。
output_schema
此 Runnable 生成的输出类型,指定为 pydantic 模型。
方法
__init__
()abatch
(inputs[, config, return_exceptions])默认实现使用 asyncio.gather 并行运行 ainvoke。
并行运行输入列表上的 ainvoke,并在完成时生成结果。
ainvoke
(input[, config])ainvoke 的默认实现,从线程调用 invoke。
as_tool
([args_schema, name, description, ...])assign
(**kwargs)将新字段分配给此 Runnable 的字典输出。
astream
(input[, config])astream 的默认实现,它调用 ainvoke。
astream_events
(input[, config, ...])流式传输来自 Runnable 的所有输出,如回调系统报告的那样。
atransform
(input[, config])atransform 的默认实现,它缓冲输入并调用 astream。
batch
(inputs[, config, return_exceptions])默认实现使用线程池执行器并行运行 invoke。
并行运行输入列表上的 invoke,并在完成时生成结果。
bind
(**kwargs)将参数绑定到 Runnable,返回一个新的 Runnable。
config_schema
(*[, include])此 Runnable 接受的配置类型,指定为 pydantic 模型。
get_graph
([config])返回此 Runnable 的图形表示。
get_input_schema
([config])获取可用于验证 Runnable 输入的 pydantic 模型。
get_name
([suffix, name])获取 Runnable 的名称。
get_output_schema
([config])获取可用于验证 Runnable 输出的 pydantic 模型。
get_prompts
([config])返回此 Runnable 使用的提示列表。
invoke
(input[, config])将单个输入转换为输出。
map
()返回一个新的 Runnable,它通过使用每个输入调用 invoke(),将输入列表映射到输出列表。
pick
(keys)从此 Runnable 的字典输出中选择键。
pipe
(*others[, name])将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
stream
(input[, config])stream 的默认实现,它调用 invoke。
transform
(input[, config])transform 的默认实现,它缓冲输入,然后调用 stream。
with_alisteners
(*[, on_start, on_end, on_error])将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_config
([config])将配置绑定到 Runnable,返回一个新的 Runnable。
with_fallbacks
(fallbacks, *[, ...])向 Runnable 添加回退,返回一个新的 Runnable。
with_listeners
(*[, on_start, on_end, on_error])将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_retry
(*[, retry_if_exception_type, ...])创建一个新的 Runnable,它在异常时重试原始 Runnable。
with_types
(*[, input_type, output_type])将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- __init__()¶
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] [source]¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 绑定的 runnables 工作良好。
如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 返回
来自 Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: Literal[False] = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Output]] [source]¶
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: Literal[True], **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]
并行运行输入列表上的 ainvoke,并在完成时生成结果。
- 参数
inputs – Runnable 的输入列表。
config – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。
return_exceptions – 是否返回异常而不是引发异常。默认为 False。
kwargs – 要传递给 Runnable 的其他关键字参数。
- 产出
输入索引和 Runnable 输出的元组。
- async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output [source]¶
ainvoke 的默认实现,从线程调用 invoke。
即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。
如果子类可以异步运行,则应覆盖此方法。
- 参数
input (Input) –
config (Optional[RunnableConfig]) –
kwargs (Any) –
- 返回类型
输出
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool [source]¶
Beta
此 API 处于 beta 阶段,将来可能会更改。
从 Runnable 创建 BaseTool。
as_tool
将从 Runnable 实例化具有名称、描述和args_schema
的 BaseTool。如果可能,架构是从runnable.get_input_schema
推断出来的。或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定的字典键),可以使用args_schema
直接指定架构。您还可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的架构。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定架构from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定架构from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本中的新增功能。
- assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any] [source]¶
将新字段分配给此 Runnable 的字典输出。返回一个新的 Runnable。
from langchain_community.llms.fake import FakeStreamingListLLM from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import SystemMessagePromptTemplate from langchain_core.runnables import Runnable from operator import itemgetter prompt = ( SystemMessagePromptTemplate.from_template("You are a nice assistant.") + "{question}" ) llm = FakeStreamingListLLM(responses=["foo-lish"]) chain: Runnable = prompt | llm | {"str": StrOutputParser()} chain_with_assign = chain.assign(hello=itemgetter("str") | llm) print(chain_with_assign.input_schema.schema()) # {'title': 'PromptInput', 'type': 'object', 'properties': {'question': {'title': 'Question', 'type': 'string'}}} print(chain_with_assign.output_schema.schema()) # {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties': {'str': {'title': 'Str', 'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
- 参数
kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –
- 返回类型
RunnableSerializable[Any, Any]
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] [source]¶
Default implementation of astream, which calls ainvoke. Subclasses should override this method if they support streaming output.
- 参数
input (Input) – The input to the Runnable.
config (Optional[RunnableConfig]) – The config to use for the Runnable. Defaults to None.
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产出
The output of the Runnable.
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] [source]¶
Beta
此 API 处于 beta 阶段,将来可能会更改。
Generate a stream of events.
Use to create an iterator over StreamEvents that provide real-time information about the progress of the Runnable, including StreamEvents from intermediate results.
A StreamEvent is a dictionary with the following schema
event
: str - Event names are of theformat: on_[runnable_type]_(start|stream|end).
name
: str - The name of the Runnable that generated the event.run_id
: str - randomly generated ID associated with the given execution ofthe Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.
parent_ids
: List[str] - The IDs of the parent runnables thatgenerated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
tags
: Optional[List[str]] - The tags of the Runnable that generatedthe event.
metadata
: Optional[Dict[str, Any]] - The metadata of the Runnablethat generated the event.
data
: Dict[str, Any]
Below is a table that illustrates some evens that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
ATTENTION This reference table is for the V2 version of the schema.
event
name
chunk
input
output
on_chat_model_start
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name]
{‘input’: ‘hello’}
on_llm_stream
[model name]
‘Hello’
on_llm_end
[model name]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name]
{“query”: “hello”}
on_retriever_end
[retriever name]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name]
{“question”: “hello”}
on_prompt_end
[template_name]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
In addition to the standard events, users can also dispatch custom events (see example below).
Custom events will be only be surfaced with in the v2 version of the API!
A custom event has following format
Attribute
Type
Description
name
str
A user defined name for the event.
data
Any
The data associated with the event. This can be anything, though we suggest making it JSON serializable.
Here are declarations associated with the standard events shown above
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
Example
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
Example: Dispatch Custom Event
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – The input to the Runnable.
config (Optional[RunnableConfig]) – The config to use for the Runnable.
version (Literal['v1', 'v2']) – The version of the schema to use either v2 or v1. Users should use v2. v1 is for backwards compatibility and will be deprecated in 0.4.0. No default will be assigned until the API is stabilized. custom events will only be surfaced in v2.
include_names (Optional[Sequence[str]]) – Only include events from runnables with matching names.
include_types (Optional[Sequence[str]]) – Only include events from runnables with matching types.
include_tags (Optional[Sequence[str]]) – Only include events from runnables with matching tags.
exclude_names (Optional[Sequence[str]]) – Exclude events from runnables with matching names.
exclude_types (Optional[Sequence[str]]) – Exclude events from runnables with matching types.
exclude_tags (Optional[Sequence[str]]) – Exclude events from runnables with matching tags.
kwargs (Any) – Additional keyword arguments to pass to the Runnable. These will be passed to astream_log as this implementation of astream_events is built on top of astream_log.
- 产出
An async stream of StreamEvents.
- Raises
NotImplementedError – If the version is not v1 or v2.
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: Literal[True] = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[RunLogPatch] [source]¶
- async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: Literal[False], with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[RunLog]
Stream all output from a Runnable, as reported to the callback system. This includes all inner runs of LLMs, Retrievers, Tools, etc.
Output is streamed as Log objects, which include a list of Jsonpatch ops that describe how the state of the run has changed in each step, and the final state of the run.
The Jsonpatch ops can be applied in order to construct state.
- 参数
input – The input to the Runnable.
config – The config to use for the Runnable.
diff – Whether to yield diffs between each step or the current state.
with_streamed_output_list – Whether to yield the streamed_output list.
include_names – Only include logs with these names.
include_types – Only include logs with these types.
include_tags – Only include logs with these tags.
exclude_names – Exclude logs with these names.
exclude_types – Exclude logs with these types.
exclude_tags – Exclude logs with these tags.
kwargs – 要传递给 Runnable 的其他关键字参数。
- 产出
A RunLogPatch or RunLog object.
- async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] [source]¶
Default implementation of atransform, which buffers input and calls astream. Subclasses should override this method if they can start producing output while input is still being generated.
- 参数
input (AsyncIterator[Input]) – An async iterator of inputs to the Runnable.
config (Optional[RunnableConfig]) – The config to use for the Runnable. Defaults to None.
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产出
The output of the Runnable.
- 返回类型
AsyncIterator[Output]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] [source]¶
默认实现使用线程池执行器并行运行 invoke。
batch 的默认实现对于 IO 绑定的 runnables 工作良好。
如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: Literal[False] = False, **kwargs: Any) Iterator[Tuple[int, Output]] [source]¶
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: Literal[True], **kwargs: Any) Iterator[Tuple[int, Union[Output, Exception]]]
并行运行输入列表上的 invoke,并在完成时生成结果。
- bind(**kwargs: Any) Runnable[Input, Output] [source]¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链式 (chain) 中的 Runnable 需要一个参数,而该参数不在前一个 Runnable 的输出中,也不包含在用户输入中时,此方法非常有用。
- 参数
kwargs (Any) – 要绑定到 Runnable 的参数。
- 返回
返回绑定了参数的新 Runnable。
- 返回类型
Runnable[Input, Output]
Example
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel] [source]¶
此 Runnable 接受的配置类型,指定为 pydantic 模型。
要将字段标记为可配置,请参阅 configurable_fields 和 configurable_alternatives 方法。
- 参数
include (Optional[Sequence[str]]) – 要包含在配置模式中的字段列表。
- 返回
一个 pydantic 模型,可用于验证配置。
- 返回类型
Type[BaseModel]
- get_graph(config: Optional[RunnableConfig] = None) Graph [source]¶
返回此 Runnable 的图形表示。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
- get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel] [source]¶
获取可用于验证 Runnable 输入的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
- 参数
config (Optional[RunnableConfig]) – 生成模式时使用的配置。
- 返回
一个 pydantic 模型,可用于验证输入。
- 返回类型
Type[BaseModel]
- get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str [source]¶
获取 Runnable 的名称。
- 参数
suffix (Optional[str]) –
name (Optional[str]) –
- 返回类型
str
- get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel] [source]¶
获取可用于验证 Runnable 输出的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
- 参数
config (Optional[RunnableConfig]) – 生成模式时使用的配置。
- 返回
一个 pydantic 模型,可用于验证输出。
- 返回类型
Type[BaseModel]
- get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate] [source]¶
返回此 Runnable 使用的提示列表。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
List[BasePromptTemplate]
- abstract invoke(input: Input, config: Optional[RunnableConfig] = None) Output [source]¶
将单个输入转换为输出。覆盖此方法以实现。
- 参数
input (Input) – The input to the Runnable.
config (Optional[RunnableConfig]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于追踪目的的 'tags'、'metadata',用于控制并行执行量的 'max_concurrency' 以及其他键。有关更多详细信息,请参阅 RunnableConfig。
- 返回
The output of the Runnable.
- 返回类型
输出
- map() Runnable[List[Input], List[Output]] [source]¶
返回一个新的 Runnable,它通过使用每个输入调用 invoke(),将输入列表映射到输出列表。
- 返回
返回一个新的 Runnable,它将输入列表映射到输出列表。
- 返回类型
Runnable[List[Input], List[Output]]
Example
from langchain_core.runnables import RunnableLambda def _lambda(x: int) -> int: return x + 1 runnable = RunnableLambda(_lambda) print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
- pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any] [source]¶
从此 Runnable 的字典输出中选择键。
- 选取单个键
import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) chain = RunnableMap(str=as_str, json=as_json) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3]} json_only_chain = chain.pick("json") json_only_chain.invoke("[1, 2, 3]") # -> [1, 2, 3]
- 选取键列表
from typing import Any import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) def as_bytes(x: Any) -> bytes: return bytes(x, "utf-8") chain = RunnableMap( str=as_str, json=as_json, bytes=RunnableLambda(as_bytes) ) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"} json_and_bytes_chain = chain.pick(["json", "bytes"]) json_and_bytes_chain.invoke("[1, 2, 3]") # -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
- 参数
keys (Union[str, List[str]]) –
- 返回类型
RunnableSerializable[Any, Any]
- pipe(*others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other] [source]¶
将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
等效于 RunnableSequence(self, *others) 或 self | others[0] | …
Example
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1.pipe(runnable_2) # Or equivalently: # sequence = runnable_1 | runnable_2 # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) # -> 4 sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) # -> [4, 6, 8]
- 参数
others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –
name (Optional[str]) –
- 返回类型
RunnableSerializable[Input, Other]
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] [source]¶
stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – The input to the Runnable.
config (Optional[RunnableConfig]) – The config to use for the Runnable. Defaults to None.
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产出
The output of the Runnable.
- 返回类型
Iterator[Output]
- transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] [source]¶
transform 的默认实现,它缓冲输入,然后调用 stream。如果子类可以在输入仍在生成时开始生成输出,则应覆盖此方法。
- 参数
input (Iterator[Input]) – Runnable 的输入迭代器。
config (Optional[RunnableConfig]) – The config to use for the Runnable. Defaults to None.
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产出
The output of the Runnable.
- 返回类型
Iterator[Output]
- with_alisteners(*, on_start: Optional[AsyncListener] = None, on_end: Optional[AsyncListener] = None, on_error: Optional[AsyncListener] = None) Runnable[Input, Output] [source]¶
将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start: 在 Runnable 开始运行之前异步调用。 on_end: 在 Runnable 完成运行后异步调用。 on_error: 如果 Runnable 抛出错误,则异步调用。
Run 对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。
- 参数
on_start (Optional[AsyncListener]) – 在 Runnable 开始运行之前异步调用。 默认为 None。
on_end (Optional[AsyncListener]) – 在 Runnable 完成运行后异步调用。 默认为 None。
on_error (Optional[AsyncListener]) – 如果 Runnable 抛出错误,则异步调用。 默认为 None。
- 返回
返回绑定了监听器的新 Runnable。
- 返回类型
Runnable[Input, Output]
Example
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output] [source]¶
将配置绑定到 Runnable,返回一个新的 Runnable。
- 参数
config (Optional[RunnableConfig]) – 要绑定到 Runnable 的配置。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 返回
返回绑定了配置的新 Runnable。
- 返回类型
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] [source]¶
向 Runnable 添加回退,返回一个新的 Runnable。
新的 Runnable 将尝试原始 Runnable,然后在失败时按顺序尝试每个回退 (fallback)。
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的 Runnables 序列。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。 默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退 (fallbacks),并使用指定的键。 如果为 None,则异常不会传递给回退 (fallbacks)。 如果使用此参数,则基本 Runnable 及其回退 (fallbacks) 必须接受字典作为输入。 默认为 None。
- 返回
返回一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退 (fallback)。
- 返回类型
RunnableWithFallbacksT[Input, Output]
Example
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的 Runnables 序列。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退 (fallbacks),并使用指定的键。 如果为 None,则异常不会传递给回退 (fallbacks)。 如果使用此参数,则基本 Runnable 及其回退 (fallbacks) 必须接受字典作为输入。
- 返回
返回一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退 (fallback)。
- 返回类型
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output] [source]¶
将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start:在 Runnable 开始运行之前调用,带有 Run 对象。 on_end:在 Runnable 完成运行之后调用,带有 Run 对象。 on_error:如果 Runnable 抛出错误时调用,带有 Run 对象。
Run 对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。
- 参数
on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 开始运行之前调用。默认为 None。
on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 完成运行之后调用。默认为 None。
on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 如果 Runnable 抛出错误时调用。默认为 None。
- 返回
返回绑定了监听器的新 Runnable。
- 返回类型
Runnable[Input, Output]
Example
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] [source]¶
创建一个新的 Runnable,它在异常时重试原始 Runnable。
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 发生这些异常类型时进行重试。默认为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为 3。
- 返回
一个新的 Runnable,它在发生异常时重试原始的 Runnable。
- 返回类型
Runnable[Input, Output]
Example
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 发生这些异常类型时进行重试
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- 返回
一个新的 Runnable,它在发生异常时重试原始的 Runnable。
- 返回类型
Runnable[Input, Output]
- with_types(*, input_type: Optional[Type[Input]] = None, output_type: Optional[Type[Output]] = None) Runnable[Input, Output] [source]¶
将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- 参数
input_type (Optional[Type[Input]]) – 要绑定到 Runnable 的输入类型。默认为 None。
output_type (Optional[Type[Output]]) – 要绑定到 Runnable 的输出类型。默认为 None。
- 返回
一个新的 Runnable,绑定了类型。
- 返回类型
Runnable[Input, Output]