langchain_core.runnables.base
.RunnableLambda¶
注意
RunnableLambda 实现了标准的 Runnable 接口
。 🏃
Runnable 接口
具有在 runnables 上可用的其他方法,例如 with_types
、 with_retry
、 assign
、 bind
、 get_graph
等。
- class langchain_core.runnables.base.RunnableLambda(func: Union[Union[Callable[[Input], Output], Callable[[Input], Iterator[Output]], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]], afunc: Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]] = None, name: Optional[str] = None)[source]¶
RunnableLambda 将 Python 可调用对象转换为 Runnable。
将可调用对象包装在 RunnableLambda 中使得该可调用对象可以在同步或异步上下文中使用。
RunnableLambda 可以像任何其他 Runnable 一样组合,并提供与 LangChain 追踪的无缝集成。
RunnableLambda
最适合不需要支持流式处理的代码。如果需要支持流式处理(即,能够处理输入块并产生输出块),请改用RunnableGenerator
。请注意,如果
RunnableLambda
返回Runnable
的实例,则该实例在执行期间被调用(或流式传输)。示例
# This is a RunnableLambda from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one) runnable.invoke(1) # returns 2 runnable.batch([1, 2, 3]) # returns [2, 3, 4] # Async is supported by default by delegating to the sync implementation await runnable.ainvoke(1) # returns 2 await runnable.abatch([1, 2, 3]) # returns [2, 3, 4] # Alternatively, can provide both synd and sync implementations async def add_one_async(x: int) -> int: return x + 1 runnable = RunnableLambda(add_one, afunc=add_one_async) runnable.invoke(1) # Uses add_one await runnable.ainvoke(1) # Uses add_one_async
从可调用对象、异步可调用对象或两者创建一个 RunnableLambda。
接受同步和异步变体,以便为同步和异步执行提供高效的实现。
- 参数
func (Union[Union[Callable[[Input], Output], Callable[[Input], Iterator[Output]], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]]) – 同步或异步可调用对象
afunc (Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]]) – 异步可调用对象,它接受输入并返回输出。默认为 None。
name (Optional[str]) – Runnable 的名称。默认为 None。
- Raises
TypeError – 如果 func 不是可调用类型。
TypeError – 如果同时提供了 func 和 afunc。
属性
InputType
此 Runnable 的输入类型。
OutputType
此 Runnable 的输出类型,作为类型注解。
config_specs
列出此 Runnable 的可配置字段。
deps
此 Runnable 的依赖项。
input_schema
此 Runnable 接受的输入类型,指定为 pydantic 模型。
name
Runnable 的名称。
output_schema
此 Runnable 生成的输出类型,指定为 pydantic 模型。
方法
__init__
(func[, afunc, name])从可调用对象、异步可调用对象或两者创建一个 RunnableLambda。
abatch
(inputs[, config, return_exceptions])默认实现使用 asyncio.gather 并行运行 ainvoke。
abatch_as_completed
(inputs[, config, ...])并行运行输入列表上的 ainvoke,并在结果完成时产生结果。
ainvoke
(input[, config])异步调用此 Runnable。
as_tool
([args_schema, name, description, ...])assign
(**kwargs)将新字段分配给此 Runnable 的字典输出。
astream
(input[, config])astream 的默认实现,它调用 ainvoke。
astream_events
(input[, config, ...])astream_log
(input[, config, diff, ...])流式传输来自 Runnable 的所有输出,如回调系统报告的那样。
atransform
(input[, config])atransform 的默认实现,它缓冲输入并调用 astream。
batch
(inputs[, config, return_exceptions])默认实现使用线程池执行器并行运行 invoke。
batch_as_completed
(inputs[, config, ...])并行运行输入列表上的 invoke,并在结果完成时产生结果。
bind
(**kwargs)将参数绑定到 Runnable,返回一个新的 Runnable。
config_schema
(*[, include])此 Runnable 接受的配置类型,指定为 pydantic 模型。
get_graph
([config])返回此 Runnable 的图形表示。
get_input_schema
([config])此 Runnable 输入的 pydantic 模式。
get_name
([suffix, name])获取 Runnable 的名称。
get_output_schema
([config])获取可用于验证 Runnable 输出的 pydantic 模型。
get_prompts
([config])返回此 Runnable 使用的提示列表。
invoke
(input[, config])同步调用此 Runnable。
map
()返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。
pick
(keys)从此 Runnable 的字典输出中选取键。
pipe
(*others[, name])将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
stream
(input[, config])stream 的默认实现,它调用 invoke。
transform
(input[, config])transform 的默认实现,它缓冲输入,然后调用 stream。
with_alisteners
(*[, on_start, on_end, on_error])将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_config
([config])将配置绑定到 Runnable,返回一个新的 Runnable。
with_fallbacks
(fallbacks, *[, ...])向 Runnable 添加回退,返回一个新的 Runnable。
with_listeners
(*[, on_start, on_end, on_error])将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_retry
(*[, retry_if_exception_type, ...])创建一个新的 Runnable,该 Runnable 在异常时重试原始 Runnable。
with_types
(*[, input_type, output_type])将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- __init__(func: Union[Union[Callable[[Input], Output], Callable[[Input], Iterator[Output]], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]], afunc: Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]] = None, name: Optional[str] = None) None [source]¶
从可调用对象、异步可调用对象或两者创建一个 RunnableLambda。
接受同步和异步变体,以便为同步和异步执行提供高效的实现。
- 参数
func (Union[Union[Callable[[Input], Output], Callable[[Input], Iterator[Output]], Callable[[Input, RunnableConfig], Output], Callable[[Input, CallbackManagerForChainRun], Output], Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]], Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]]) – 同步或异步可调用对象
afunc (Optional[Union[Callable[[Input], Awaitable[Output]], Callable[[Input], AsyncIterator[Output]], Callable[[Input, RunnableConfig], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]], Callable[[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]]]]) – 异步可调用对象,它接受输入并返回输出。默认为 None。
name (Optional[str]) – Runnable 的名称。默认为 None。
- Raises
TypeError – 如果 func 不是可调用类型。
TypeError – 如果同时提供了 func 和 afunc。
- 返回类型
None
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
批处理的默认实现对于 IO 绑定的 Runnable 效果良好。
如果子类可以更有效地进行批处理,则应重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 返回
来自 Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行输入列表上的 ainvoke,并在结果完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行量的 ‘max_concurrency’ 以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
包含输入索引和 Runnable 输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output [source]¶
异步调用此 Runnable。
- 参数
input (Input) – 此 Runnable 的输入。
config (Optional[RunnableConfig]) – 要使用的配置。默认为 None。
kwargs (Optional[Any]) – 附加关键字参数。
- 返回
此 Runnable 的输出。
- 返回类型
Output
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 Beta 阶段,将来可能会发生变化。
从 Runnable 创建一个 BaseTool。
as_tool
将从 Runnable 实例化一个具有名称、描述和args_schema
的 BaseTool。如果可能,架构将从runnable.get_input_schema
推断。或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定的字典键),可以使用args_schema
直接指定架构。您还可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的架构。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定架构from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定架构from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本中的新功能。
- assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any] ¶
将新字段分配给此 Runnable 的字典输出。返回一个新的 Runnable。
from langchain_community.llms.fake import FakeStreamingListLLM from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import SystemMessagePromptTemplate from langchain_core.runnables import Runnable from operator import itemgetter prompt = ( SystemMessagePromptTemplate.from_template("You are a nice assistant.") + "{question}" ) llm = FakeStreamingListLLM(responses=["foo-lish"]) chain: Runnable = prompt | llm | {"str": StrOutputParser()} chain_with_assign = chain.assign(hello=itemgetter("str") | llm) print(chain_with_assign.input_schema.schema()) # {'title': 'PromptInput', 'type': 'object', 'properties': {'question': {'title': 'Question', 'type': 'string'}}} print(chain_with_assign.output_schema.schema()) # {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties': {'str': {'title': 'Str', 'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
- 参数
kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –
- 返回类型
RunnableSerializable[Any, Any]
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 Beta 阶段,将来可能会发生变化。
生成事件流。
用于创建 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个具有以下架构的字典
event
: str - 事件名称的格式为:格式:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与给定执行关联的随机生成的 ID发射事件的 Runnable。作为父级 Runnable 执行一部分而被调用的子级 Runnable 会被分配其自己唯一的 ID。
parent_ids
: List[str] - 生成事件的父级 runnable 的 ID。根 Runnable 将拥有一个空列表。父级 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。事件。
metadata
: Optional[Dict[str, Any]] - Runnable 的元数据生成事件的 Runnable 的元数据。
data
: Dict[str, Any]
下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链定义已包含在表格之后。
注意 此参考表适用于模式的 V2 版本。
事件
name
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content=”hello”)
on_chat_model_end
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[检索器名称]
{“query”: “hello”}
on_retriever_end
[检索器名称]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{“question”: “hello”}
on_prompt_end
[模板名称]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件之外,用户还可以调度自定义事件(请参见下面的示例)。
自定义事件将仅在 v2 版本的 API 中显示!
自定义事件具有以下格式
属性
类型
描述
name
str
用户定义的事件名称。
数据
Any
与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。
以下是与上面显示的标准事件关联的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:调度自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2 或 v1。用户应使用 v2。v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnable 的事件。
include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnable 的事件。
include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnable 的事件。
exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnable 的事件。
exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnable 的事件。
exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnable 的事件。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些将传递给 astream_log,因为 astream_events 的此实现是基于 astream_log 构建的。
- 产生
StreamEvents 的异步流。
- Raises
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]] ¶
从 Runnable 流式传输所有输出,如向回调系统报告的那样。这包括 LLM、检索器、工具等的所有内部运行。
输出作为 Log 对象流式传输,其中包括 Jsonpatch 操作列表,这些操作描述了运行状态在每个步骤中是如何变化的,以及运行的最终状态。
Jsonpatch 操作可以按顺序应用以构造状态。
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
diff (bool) – 是否产生每个步骤之间的差异或当前状态。
with_streamed_output_list (bool) – 是否产生 streamed_output 列表。
include_names (Optional[Sequence[str]]) – 仅包括具有这些名称的日志。
include_types (Optional[Sequence[str]]) – 仅包括具有这些类型的日志。
include_tags (Optional[Sequence[str]]) – 仅包括具有这些标签的日志。
exclude_names (Optional[Sequence[str]]) – 排除具有这些名称的日志。
exclude_types (Optional[Sequence[str]]) – 排除具有这些类型的日志。
exclude_tags (Optional[Sequence[str]]) – 排除具有这些标签的日志。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 产生
RunLogPatch 或 RunLog 对象。
- 返回类型
Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]
- async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] [source]¶
atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在输入仍在生成时开始生成输出,则应覆盖此方法。
- 参数
input (AsyncIterator[Input]) – Runnable 的输入异步迭代器。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 绑定的 Runnable 效果良好。
如果子类可以更有效地进行批处理,则应重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行输入列表上的 invoke,并在结果完成时产生结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- bind(**kwargs: Any) Runnable[Input, Output] ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个参数,而该参数不在前一个 Runnable 的输出中或未包含在用户输入中时,此方法很有用。
- 参数
kwargs (Any) – 要绑定到 Runnable 的参数。
- 返回
具有绑定参数的新 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel] ¶
此 Runnable 接受的配置类型,指定为 pydantic 模型。
要将字段标记为可配置,请参阅 configurable_fields 和 configurable_alternatives 方法。
- 参数
include (Optional[Sequence[str]]) – 要包含在配置架构中的字段列表。
- 返回
可用于验证配置的 pydantic 模型。
- 返回类型
Type[BaseModel]
- get_graph(config: Optional[RunnableConfig] = None) Graph [source]¶
返回此 Runnable 的图形表示。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
- get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel] [source]¶
此 Runnable 输入的 pydantic 模式。
- 参数
config (Optional[RunnableConfig]) – 要使用的配置。默认为 None。
- 返回
此 Runnable 的输入架构。
- 返回类型
Type[BaseModel]
- get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str ¶
获取 Runnable 的名称。
- 参数
suffix (Optional[str]) –
name (Optional[str]) –
- 返回类型
str
- get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel] ¶
获取可用于验证 Runnable 输出的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输出架构,该架构取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出架构。
- 参数
config (Optional[RunnableConfig]) – 生成架构时要使用的配置。
- 返回
可用于验证输出的 pydantic 模型。
- 返回类型
Type[BaseModel]
- get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate] ¶
返回此 Runnable 使用的提示列表。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
List[BasePromptTemplate]
- invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output [source]¶
同步调用此 Runnable。
- 参数
input (Input) – 此 Runnable 的输入。
config (Optional[RunnableConfig]) – 要使用的配置。默认为 None。
kwargs (Optional[Any]) – 附加关键字参数。
- 返回
此 Runnable 的输出。
- Raises
TypeError – 如果 Runnable 是协程函数。
- 返回类型
Output
- map() Runnable[List[Input], List[Output]] ¶
返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。
- 返回
一个新的 Runnable,它将输入列表映射到输出列表。
- 返回类型
Runnable[List[Input], List[Output]]
示例
from langchain_core.runnables import RunnableLambda def _lambda(x: int) -> int: return x + 1 runnable = RunnableLambda(_lambda) print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
- pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any] ¶
从此 Runnable 的字典输出中选取键。
- 选择单个键
import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) chain = RunnableMap(str=as_str, json=as_json) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3]} json_only_chain = chain.pick("json") json_only_chain.invoke("[1, 2, 3]") # -> [1, 2, 3]
- 选择键列表
from typing import Any import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) def as_bytes(x: Any) -> bytes: return bytes(x, "utf-8") chain = RunnableMap( str=as_str, json=as_json, bytes=RunnableLambda(as_bytes) ) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"} json_and_bytes_chain = chain.pick(["json", "bytes"]) json_and_bytes_chain.invoke("[1, 2, 3]") # -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
- 参数
keys (Union[str, List[str]]) –
- 返回类型
RunnableSerializable[Any, Any]
- pipe(**others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other] ¶
将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
等效于 RunnableSequence(self, *others) 或 self | others[0] | …
示例
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1.pipe(runnable_2) # Or equivalently: # sequence = runnable_1 | runnable_2 # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) # -> 4 sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) # -> [4, 6, 8]
- 参数
others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –
name (Optional[str]) –
- 返回类型
RunnableSerializable[Input, Other]
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] [source]¶
stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应该重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] [source]¶
transform 的默认实现,它会缓冲输入,然后调用 stream。如果子类可以在输入仍在生成时开始生成输出,则应该重写此方法。
- 参数
input (Iterator[Input]) – Runnable 的输入迭代器。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- with_alisteners(*, on_start: Optional[AsyncListener] = None, on_end: Optional[AsyncListener] = None, on_error: Optional[AsyncListener] = None) Runnable[Input, Output] ¶
将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start:在 Runnable 开始运行之前异步调用。 on_end:在 Runnable 运行结束后异步调用。 on_error:如果 Runnable 抛出错误,则异步调用。
Run 对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间和添加到运行的任何标签或元数据。
- 参数
on_start (Optional[AsyncListener]) – 在 Runnable 开始运行之前异步调用。默认为 None。
on_end (Optional[AsyncListener]) – 在 Runnable 运行结束后异步调用。默认为 None。
on_error (Optional[AsyncListener]) – 如果 Runnable 抛出错误,则异步调用。默认为 None。
- 返回
绑定了监听器的新 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output] ¶
将配置绑定到 Runnable,返回一个新的 Runnable。
- 参数
config (Optional[RunnableConfig]) – 要绑定到 Runnable 的配置。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 返回
绑定了配置的新 Runnable。
- 返回类型
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] ¶
向 Runnable 添加回退,返回一个新的 Runnable。
新的 Runnable 将在失败时尝试原始 Runnable,然后按顺序尝试每个回退方案。
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的回退方案序列。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退方案,键为指定的键。如果为 None,则异常将不会传递给回退方案。如果使用,则基本 Runnable 及其回退方案必须接受字典作为输入。默认为 None。
- 返回
一个新的 Runnable,它将在失败时尝试原始 Runnable,然后按顺序尝试每个回退方案。
- 返回类型
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的回退方案序列。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退方案,键为指定的键。如果为 None,则异常将不会传递给回退方案。如果使用,则基本 Runnable 及其回退方案必须接受字典作为输入。
- 返回
一个新的 Runnable,它将在失败时尝试原始 Runnable,然后按顺序尝试每个回退方案。
- 返回类型
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output] ¶
将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start:在 Runnable 开始运行之前调用,带有 Run 对象。 on_end:在 Runnable 运行结束后调用,带有 Run 对象。 on_error:如果 Runnable 抛出错误,则调用,带有 Run 对象。
Run 对象包含有关运行的信息,包括其 id、类型、输入、输出、错误、开始时间、结束时间和添加到运行的任何标签或元数据。
- 参数
on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 开始运行之前调用。默认为 None。
on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 运行结束后调用。默认为 None。
on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 如果 Runnable 抛出错误,则调用。默认为 None。
- 返回
绑定了监听器的新 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] ¶
创建一个新的 Runnable,该 Runnable 在异常时重试原始 Runnable。
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 发生这些异常类型时进行重试。默认为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 放弃之前的最大尝试次数。默认为 3。
- 返回
一个新的 Runnable,它会在发生异常时重试原始 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 发生这些异常类型时进行重试
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动
stop_after_attempt (int) – 放弃之前的最大尝试次数
- 返回
一个新的 Runnable,它会在发生异常时重试原始 Runnable。
- 返回类型
Runnable[Input, Output]
- with_types(*, input_type: Optional[Type] = None, output_type: Optional[Type] = None) Runnable[Input, Output] ¶
将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- 参数
input_type (Optional[Type[Input]]) – 要绑定到 Runnable 的输入类型。默认为 None。
output_type (Optional[Type[Output]]) – 要绑定到 Runnable 的输出类型。默认为 None。
- 返回
绑定了类型的新 Runnable。
- 返回类型
Runnable[Input, Output]