langchain_nvidia_ai_endpoints.tools
.ServerToolsMixin¶
注意
ServerToolsMixin 实现了标准的 Runnable 接口
。 🏃
Runnable 接口
具有在 runnables 上可用的其他方法,例如 with_types
、 with_retry
、 assign
、 bind
、 get_graph
等。
- class langchain_nvidia_ai_endpoints.tools.ServerToolsMixin[source]¶
属性
InputType
此 Runnable 接受的输入类型,指定为类型注解。
OutputType
此 Runnable 生成的输出类型,指定为类型注解。
config_specs
此 Runnable 的可配置字段列表。
input_schema
此 Runnable 接受的输入类型,指定为 pydantic 模型。
name
Runnable 的名称。
output_schema
此 Runnable 生成的输出类型,指定为 pydantic 模型。
方法
__init__
()abatch
(inputs[, config, return_exceptions])默认实现使用 asyncio.gather 并行运行 ainvoke。
abatch_as_completed
(inputs[, config, ...])在一系列输入上并行运行 ainvoke,并在完成时产生结果。
ainvoke
(input[, config])ainvoke 的默认实现,从线程调用 invoke。
as_tool
([args_schema, name, description, ...])assign
(**kwargs)将新字段分配给此 Runnable 的字典输出。
astream
(input[, config])astream 的默认实现,它调用 ainvoke。
astream_events
(input[, config, ...])astream_log
(input[, config, diff, ...])流式传输来自 Runnable 的所有输出,如报告给回调系统。
atransform
(input[, config])atransform 的默认实现,它缓冲输入并调用 astream。
batch
(inputs[, config, return_exceptions])默认实现使用线程池执行器并行运行 invoke。
batch_as_completed
(inputs[, config, ...])在一系列输入上并行运行 invoke,并在完成时产生结果。
bind
(**kwargs)将参数绑定到 Runnable,返回一个新的 Runnable。
bind_tools
(tools[, tool_arg, conversion_fn])将类似工具的对象绑定到此聊天模型。
config_schema
(*[, include])此 Runnable 接受的配置类型,指定为 pydantic 模型。
get_graph
([config])返回此 Runnable 的图形表示。
get_input_schema
([config])获取可用于验证 Runnable 输入的 pydantic 模型。
get_name
([suffix, name])获取 Runnable 的名称。
get_output_schema
([config])获取可用于验证 Runnable 输出的 pydantic 模型。
get_prompts
([config])返回由此 Runnable 使用的提示列表。
invoke
(input[, config])将单个输入转换为输出。
map
()返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。
pick
(keys)从此 Runnable 的字典输出中选取键。
pipe
(*others[, name])将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
stream
(input[, config])stream 的默认实现,它调用 invoke。
transform
(input[, config])transform 的默认实现,它缓冲输入,然后调用 stream。
with_alisteners
(*[, on_start, on_end, on_error])将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_config
([config])将配置绑定到 Runnable,返回一个新的 Runnable。
with_fallbacks
(fallbacks, *[, ...])向 Runnable 添加回退,返回一个新的 Runnable。
with_listeners
(*[, on_start, on_end, on_error])将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_retry
(*[, retry_if_exception_type, ...])创建一个新的 Runnable,它在出现异常时重试原始 Runnable。
with_structured_output
(schema, *[, ...])模型包装器,返回格式化为匹配给定模式的输出。
with_types
(*[, input_type, output_type])将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- __init__()¶
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现非常适合 IO 绑定 runnable。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 返回
来自 Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
在一系列输入上并行运行 ainvoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。 默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
输入索引和 Runnable 输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output ¶
ainvoke 的默认实现,从线程调用 invoke。
即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。
如果子类可以异步运行,则应覆盖此方法。
- 参数
input (Input) –
config (Optional[RunnableConfig]) –
kwargs (Any) –
- 返回类型
Output
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 Beta 阶段,将来可能会更改。
从 Runnable 创建 BaseTool。
as_tool
将从 Runnable 实例化具有名称、描述和args_schema
的 BaseTool。 如果可能,模式会从runnable.get_input_schema
推断。 或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定字典键),可以使用args_schema
直接指定模式。 您也可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的模式。 默认为 None。
name (Optional[str]) – 工具的名称。 默认为 None。
description (Optional[str]) – 工具的描述。 默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。 默认为 None。
- 返回
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本新增功能。
- assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable]Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any] ¶
向此 Runnable 的字典输出分配新字段。返回一个新的 Runnable。
from langchain_community.llms.fake import FakeStreamingListLLM from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import SystemMessagePromptTemplate from langchain_core.runnables import Runnable from operator import itemgetter prompt = ( SystemMessagePromptTemplate.from_template("You are a nice assistant.") + "{question}" ) llm = FakeStreamingListLLM(responses=["foo-lish"]) chain: Runnable = prompt | llm | {"str": StrOutputParser()} chain_with_assign = chain.assign(hello=itemgetter("str") | llm) print(chain_with_assign.input_schema.schema()) # {'title': 'PromptInput', 'type': 'object', 'properties': {'question': {'title': 'Question', 'type': 'string'}}} print(chain_with_assign.output_schema.schema()) # {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties': {'str': {'title': 'Str', 'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
- 参数
kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –
- 返回类型
RunnableSerializable[Any, Any]
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 Beta 阶段,将来可能会更改。
生成事件流。
用于创建一个迭代器,遍历 StreamEvents,这些事件提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个字典,具有以下模式
event
: str - 事件名称的格式为格式:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 随机生成的 ID,与给定 Runnable 执行的事件相关联。作为父 Runnable 执行的一部分而调用的子 Runnable 将被分配其自己的唯一 ID。
parent_ids
: List[str] - 生成事件的父 runnables 的 ID。根 Runnable 将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。事件。
metadata
: Optional[Dict[str, Any]] - Runnable 的元数据生成了事件。
data
: Dict[str, Any]
下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,元数据字段已从表格中省略。链定义已包含在表格之后。
注意 此参考表适用于 V2 版本的模式。
事件
name
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content=”hello”)
on_chat_model_end
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[检索器名称]
{“query”: “hello”}
on_retriever_end
[检索器名称]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{“question”: “hello”}
on_prompt_end
[模板名称]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以调度自定义事件(请参阅下面的示例)。
自定义事件将仅在 API 的 v2 版本中显示!
自定义事件具有以下格式
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。
以下是与上面显示的标准事件关联的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:调度自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2 或 v1。用户应使用 v2。v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包含来自具有匹配名称的 runnables 的事件。
include_types (Optional[Sequence[str]]) – 仅包含来自具有匹配类型的 runnables 的事件。
include_tags (Optional[Sequence[str]]) – 仅包含来自具有匹配标签的 runnables 的事件。
exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。
exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。
exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为此 astream_events 的实现构建在 astream_log 之上。
- 产生
StreamEvents 的异步流。
- Raises
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]] ¶
流式传输来自 Runnable 的所有输出,如回调系统报告的那样。这包括 LLM、Retriever、Tool 等的所有内部运行。
输出作为 Log 对象流式传输,其中包括 Jsonpatch ops 列表,这些操作描述了运行状态在每个步骤中如何更改,以及运行的最终状态。
可以按顺序应用 Jsonpatch ops 来构建状态。
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
diff (bool) – 是否产生每个步骤之间的差异或当前状态。
with_streamed_output_list (bool) – 是否产生 streamed_output 列表。
include_names (Optional[Sequence[str]]) – 仅包含具有这些名称的日志。
include_types (Optional[Sequence[str]]) – 仅包含具有这些类型的日志。
include_tags (Optional[Sequence[str]]) – 仅包含具有这些标签的日志。
exclude_names (Optional[Sequence[str]]) – 排除具有这些名称的日志。
exclude_types (Optional[Sequence[str]]) – 排除具有这些类型的日志。
exclude_tags (Optional[Sequence[str]]) – 排除具有这些标签的日志。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 产生
RunLogPatch 或 RunLog 对象。
- 返回类型
Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]
- async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在仍在生成输入时开始生成输出,则应重写此方法。
- 参数
input (AsyncIterator[Input]) – Runnable 的输入异步迭代器。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
batch 的默认实现非常适合 IO 绑定 runnable。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
在一系列输入上并行运行 invoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- bind(**kwargs: Any) Runnable[Input, Output] ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个参数,而该参数不在前一个 Runnable 的输出中或未包含在用户输入中时,此方法很有用。
- 参数
kwargs (Any) – 要绑定到 Runnable 的参数。
- 返回
绑定了参数的新 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- bind_tools(tools: ~typing.Sequence[~typing.Union[~typing.Dict[str, ~typing.Any], ~typing.Type[~pydantic.main.BaseModel], ~typing.Callable, ~langchain_core.tools.BaseTool]], tool_arg: str = 'tools', conversion_fn: ~typing.Callable = <function convert_to_openai_tool>, **kwargs: ~typing.Any) Runnable[Union[PromptValue, str, Sequence[Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]]], BaseMessage] [source]¶
将类似工具的对象绑定到此聊天模型。
假定模型与 OpenAI 工具调用 API 兼容。
- 参数
tools (Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]]) – 要绑定到此聊天模型的工具定义列表。可以是字典、pydantic 模型、可调用对象或 BaseTool。Pydantic 模型、可调用对象和 BaseTool 将自动转换为其模式字典表示形式。
**kwargs (Any) – 要传递给
Runnable
构造函数的任何其他参数。tool_arg (str) –
conversion_fn (Callable) –
**kwargs –
- 返回类型
Runnable[Union[PromptValue, str, Sequence[Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]]]], BaseMessage]
实验性功能:此方法旨在为未来提供支持。在类中调用: ``` class TooledChatNVIDIA(ChatNVIDIA, ToolsMixin)
pass
llm = TooledChatNVIDIA(model=”mixtral_8x7b”) tooled_llm = llm.bind_tools(tools) tooled_llm.invoke(“Hello world!!”) ```
``` from langchain_nvidia_ai_endpoints import ChatNVIDIA, ServerToolsMixin from langchain_core.pydantic_v1 import BaseModel, Field
# 请注意,此处的文档字符串至关重要,因为它们将与类名一起传递给模型。 class Multiply(BaseModel)
“将两个整数相乘。” a: int = Field(…, description=”第一个整数”) b: int = Field(…, description=”第二个整数”)
- class TooledChatNVIDIA(ServerToolsMixin, ChatNVIDIA)
pass
llm = TooledChatNVIDIA().mode(“openai”, model=”gpt-3.5-turbo-0125”) llm.bind_tools([Multiply]).invoke(“请为我相乘?”) llm.client.last_response.json() ```
有关更多文档,请参阅 langchain-mistralal/openai 的实现。
- config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel] ¶
此 Runnable 接受的配置类型,指定为 pydantic 模型。
要将字段标记为可配置,请参阅 configurable_fields 和 configurable_alternatives 方法。
- 参数
include (Optional[Sequence[str]]) – 要包含在配置模式中的字段列表。
- 返回
可用于验证配置的 pydantic 模型。
- 返回类型
Type[BaseModel]
- get_graph(config: Optional[RunnableConfig] = None) Graph ¶
返回此 Runnable 的图形表示。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
- get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel] ¶
获取可用于验证 Runnable 输入的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
- 参数
config (Optional[RunnableConfig]) – 生成模式时要使用的配置。
- 返回
可用于验证输入的 pydantic 模型。
- 返回类型
Type[BaseModel]
- get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str ¶
获取 Runnable 的名称。
- 参数
suffix (Optional[str]) –
name (Optional[str]) –
- 返回类型
str
- get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel] ¶
获取可用于验证 Runnable 输出的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
- 参数
config (Optional[RunnableConfig]) – 生成模式时要使用的配置。
- 返回
可用于验证输出的 pydantic 模型。
- 返回类型
Type[BaseModel]
- get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate] ¶
返回由此 Runnable 使用的提示列表。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
List[BasePromptTemplate]
- abstract invoke(input: Input, config: Optional[RunnableConfig] = None) Output ¶
将单个输入转换为输出。覆盖此方法以实现。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行量的“max_concurrency”以及其他键。请参阅 RunnableConfig 以获取更多详细信息。
- 返回
Runnable 的输出。
- 返回类型
Output
- map() Runnable[List[Input], List[Output]] ¶
返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。
- 返回
一个新的 Runnable,它将输入列表映射到输出列表。
- 返回类型
Runnable[List[Input], List[Output]]
示例
from langchain_core.runnables import RunnableLambda def _lambda(x: int) -> int: return x + 1 runnable = RunnableLambda(_lambda) print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
- pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any] ¶
从此 Runnable 的字典输出中选取键。
- 选择单个键
import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) chain = RunnableMap(str=as_str, json=as_json) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3]} json_only_chain = chain.pick("json") json_only_chain.invoke("[1, 2, 3]") # -> [1, 2, 3]
- 选择键列表
from typing import Any import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) def as_bytes(x: Any) -> bytes: return bytes(x, "utf-8") chain = RunnableMap( str=as_str, json=as_json, bytes=RunnableLambda(as_bytes) ) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"} json_and_bytes_chain = chain.pick(["json", "bytes"]) json_and_bytes_chain.invoke("[1, 2, 3]") # -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
- 参数
keys (Union[str, List[str]]) –
- 返回类型
RunnableSerializable[Any, Any]
- pipe(*others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other] ¶
将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
等效于 RunnableSequence(self, *others) 或 self | others[0] | …
示例
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1.pipe(runnable_2) # Or equivalently: # sequence = runnable_1 | runnable_2 # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) # -> 4 sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) # -> [4, 6, 8]
- 参数
others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –
name (Optional[str]) –
- 返回类型
RunnableSerializable[Input, Other]
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
流式传输的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
转换的默认实现,它缓冲输入,然后调用 stream。如果子类可以在仍在生成输入时开始生成输出,则应覆盖此方法。
- 参数
input (Iterator[Input]) – Runnable 的输入迭代器。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- with_alisteners(*, on_start: Optional[AsyncListener] = None, on_end: Optional[AsyncListener] = None, on_error: Optional[AsyncListener] = None) Runnable[Input, Output] ¶
将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start:在 Runnable 开始运行之前异步调用。 on_end:在 Runnable 完成运行后异步调用。 on_error:如果 Runnable 抛出错误,则异步调用。
Run 对象包含有关运行的信息,包括其 ID、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。
- 参数
on_start (Optional[AsyncListener]) – 在 Runnable 开始运行之前异步调用。默认为 None。
on_end (Optional[AsyncListener]) – 在 Runnable 完成运行后异步调用。默认为 None。
on_error (Optional[AsyncListener]) – 如果 Runnable 抛出错误,则异步调用。默认为 None。
- 返回
绑定侦听器的新 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output] ¶
将配置绑定到 Runnable,返回一个新的 Runnable。
- 参数
config (Optional[RunnableConfig]) – 要绑定到 Runnable 的配置。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 返回
绑定配置的新 Runnable。
- 返回类型
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] ¶
向 Runnable 添加回退,返回一个新的 Runnable。
新的 Runnable 将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的一系列 runnable。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,键为指定的键。如果为 None,则异常将不会传递给回退。如果使用,则基本 Runnable 及其回退必须接受字典作为输入。默认为 None。
- 返回
一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。
- 返回类型
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的一系列 runnable。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,键为指定的键。如果为 None,则异常将不会传递给回退。如果使用,则基本 Runnable 及其回退必须接受字典作为输入。
- 返回
一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。
- 返回类型
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output] ¶
将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start:在 Runnable 开始运行之前调用,使用 Run 对象。 on_end:在 Runnable 完成运行后调用,使用 Run 对象。 on_error:如果 Runnable 抛出错误,则调用,使用 Run 对象。
Run 对象包含有关运行的信息,包括其 ID、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。
- 参数
on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 开始运行之前调用。默认为 None。
on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 完成运行后调用。默认为 None。
on_error (可选[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 当 Runnable 抛出错误时调用。默认为 None。
- 返回
绑定侦听器的新 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] ¶
创建一个新的 Runnable,它在出现异常时重试原始 Runnable。
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 用于重试的异常类型元组。默认为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为 3。
- 返回
一个新的 Runnable,它在发生异常时重试原始的 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 用于重试的异常类型元组
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- 返回
一个新的 Runnable,它在发生异常时重试原始的 Runnable。
- 返回类型
Runnable[Input, Output]
- with_structured_output(schema: ~typing.Union[~typing.Dict, ~typing.Type[~pydantic.main.BaseModel]], *, include_raw: bool = False, tool_arg: str = 'tools', conversion_fn: ~typing.Callable = <function convert_to_openai_tool>, **kwargs: ~typing.Any) Runnable[Union[PromptValue, str, Sequence[Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]]]], Union[Dict, BaseModel]] [source]¶
模型包装器,返回格式化为匹配给定模式的输出。
- 参数
schema (Union[Dict, Type[BaseModel]]) – 输出 schema,可以是 dict 或 Pydantic 类。如果是 Pydantic 类,则模型输出将是该类的对象。如果是 dict,则模型输出将是 dict。使用 Pydantic 类,返回的属性将被验证,而使用 dict 则不会。如果 method 是 “function_calling” 并且 schema 是 dict,则该 dict 必须符合 OpenAI 函数调用的规范。
include_raw (bool) – 如果为 False,则仅返回解析后的结构化输出。如果在模型输出解析期间发生错误,则会引发错误。如果为 True,则将返回原始模型响应 (BaseMessage) 和解析后的模型响应。如果在输出解析期间发生错误,它将被捕获并同样返回。最终输出始终是一个包含键 “raw”、“parsed” 和 “parsing_error” 的 dict。
tool_arg (str) –
conversion_fn (Callable) –
kwargs (Any) –
- 返回
- 如果 include_raw 为 True,则返回一个包含以下键的 dict
raw: BaseMessage parsed: Pydantic BaseModel 或 Dictionary parsing_error: Optional[BaseException]
如果 include_raw 为 False,则仅返回 BaseModel/Dictionary(取决于 schema 类型)。
- 返回类型
一个 Runnable,它接受任何 ChatModel 输入并返回作为输出
实验性功能:此方法旨在为未来提供支持。在类中调用: ``` class TooledChatNVIDIA(ChatNVIDIA, ToolsMixin)
pass
有关更多文档,请参阅 langchain-mistralal/openai 的实现。
- with_types(*, input_type: Optional[Type[Input]] = None, output_type: Optional[Type[Output]] = None) Runnable[Input, Output] ¶
将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- 参数
input_type (可选[Type[Input]]) – 要绑定到 Runnable 的输入类型。默认为 None。
output_type (可选[Type[Output]]) – 要绑定到 Runnable 的输出类型。默认为 None。
- 返回
一个新的 Runnable,其类型已绑定。
- 返回类型
Runnable[Input, Output]