langchain_nvidia_ai_endpoints.tools.ServerToolsMixin

注意

ServerToolsMixin 实现了标准的 Runnable 接口。 🏃

Runnable 接口 具有在 runnables 上可用的其他方法,例如 with_typeswith_retryassignbindget_graph 等。

class langchain_nvidia_ai_endpoints.tools.ServerToolsMixin[source]

属性

InputType

此 Runnable 接受的输入类型,指定为类型注解。

OutputType

此 Runnable 生成的输出类型,指定为类型注解。

config_specs

此 Runnable 的可配置字段列表。

input_schema

此 Runnable 接受的输入类型,指定为 pydantic 模型。

name

Runnable 的名称。

output_schema

此 Runnable 生成的输出类型,指定为 pydantic 模型。

方法

__init__()

abatch(inputs[, config, return_exceptions])

默认实现使用 asyncio.gather 并行运行 ainvoke。

abatch_as_completed(inputs[, config, ...])

在一系列输入上并行运行 ainvoke,并在完成时产生结果。

ainvoke(input[, config])

ainvoke 的默认实现,从线程调用 invoke。

as_tool([args_schema, name, description, ...])

assign(**kwargs)

将新字段分配给此 Runnable 的字典输出。

astream(input[, config])

astream 的默认实现,它调用 ainvoke。

astream_events(input[, config, ...])

astream_log(input[, config, diff, ...])

流式传输来自 Runnable 的所有输出,如报告给回调系统。

atransform(input[, config])

atransform 的默认实现,它缓冲输入并调用 astream。

batch(inputs[, config, return_exceptions])

默认实现使用线程池执行器并行运行 invoke。

batch_as_completed(inputs[, config, ...])

在一系列输入上并行运行 invoke,并在完成时产生结果。

bind(**kwargs)

将参数绑定到 Runnable,返回一个新的 Runnable。

bind_tools(tools[, tool_arg, conversion_fn])

将类似工具的对象绑定到此聊天模型。

config_schema(*[, include])

此 Runnable 接受的配置类型,指定为 pydantic 模型。

get_graph([config])

返回此 Runnable 的图形表示。

get_input_schema([config])

获取可用于验证 Runnable 输入的 pydantic 模型。

get_name([suffix, name])

获取 Runnable 的名称。

get_output_schema([config])

获取可用于验证 Runnable 输出的 pydantic 模型。

get_prompts([config])

返回由此 Runnable 使用的提示列表。

invoke(input[, config])

将单个输入转换为输出。

map()

返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。

pick(keys)

从此 Runnable 的字典输出中选取键。

pipe(*others[, name])

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

stream(input[, config])

stream 的默认实现,它调用 invoke。

transform(input[, config])

transform 的默认实现,它缓冲输入,然后调用 stream。

with_alisteners(*[, on_start, on_end, on_error])

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_config([config])

将配置绑定到 Runnable,返回一个新的 Runnable。

with_fallbacks(fallbacks, *[, ...])

向 Runnable 添加回退,返回一个新的 Runnable。

with_listeners(*[, on_start, on_end, on_error])

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

with_retry(*[, retry_if_exception_type, ...])

创建一个新的 Runnable,它在出现异常时重试原始 Runnable。

with_structured_output(schema, *[, ...])

模型包装器,返回格式化为匹配给定模式的输出。

with_types(*[, input_type, output_type])

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

__init__()
async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现非常适合 IO 绑定 runnable。

如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

返回

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在一系列输入上并行运行 ainvoke,并在完成时产生结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。 此配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。 有关更多详细信息,请参阅 RunnableConfig。 默认为 None。 默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。 默认为 False。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

产生

输入索引和 Runnable 输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output

ainvoke 的默认实现,从线程调用 invoke。

即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。

如果子类可以异步运行,则应覆盖此方法。

参数
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Any) –

返回类型

Output

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,将来可能会更改。

从 Runnable 创建 BaseTool。

as_tool 将从 Runnable 实例化具有名称、描述和 args_schema 的 BaseTool。 如果可能,模式会从 runnable.get_input_schema 推断。 或者(例如,如果 Runnable 接受字典作为输入,并且未键入特定字典键),可以使用 args_schema 直接指定模式。 您也可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的模式。 默认为 None。

  • name (Optional[str]) – 工具的名称。 默认为 None。

  • description (Optional[str]) – 工具的描述。 默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。 默认为 None。

返回

BaseTool 实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本新增功能。

assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable]Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any]

向此 Runnable 的字典输出分配新字段。返回一个新的 Runnable。

from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter

prompt = (
    SystemMessagePromptTemplate.from_template("You are a nice assistant.")
    + "{question}"
)
llm = FakeStreamingListLLM(responses=["foo-lish"])

chain: Runnable = prompt | llm | {"str": StrOutputParser()}

chain_with_assign = chain.assign(hello=itemgetter("str") | llm)

print(chain_with_assign.input_schema.schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.schema()) #
{'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
参数

kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –

返回类型

RunnableSerializable[Any, Any]

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,将来可能会更改。

生成事件流。

用于创建一个迭代器,遍历 StreamEvents,这些事件提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个字典,具有以下模式

  • event: str - 事件名称的格式为

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 随机生成的 ID,与给定 Runnable 执行的

    事件相关联。作为父 Runnable 执行的一部分而调用的子 Runnable 将被分配其自己的唯一 ID。

  • parent_ids: List[str] - 生成事件的父 runnables 的 ID。

    根 Runnable 将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    事件。

  • metadata: Optional[Dict[str, Any]] - Runnable 的元数据

    生成了事件。

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,元数据字段已从表格中省略。链定义已包含在表格之后。

注意 此参考表适用于 V2 版本的模式。

事件

name

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以调度自定义事件(请参阅下面的示例)。

自定义事件将仅在 API 的 v2 版本中显示!

自定义事件具有以下格式

属性

类型

描述

name

str

用户定义的事件名称。

data

Any

与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。

以下是与上面显示的标准事件关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:调度自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包含来自具有匹配名称的 runnables 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包含来自具有匹配类型的 runnables 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包含来自具有匹配标签的 runnables 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为此 astream_events 的实现构建在 astream_log 之上。

产生

StreamEvents 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

流式传输来自 Runnable 的所有输出,如回调系统报告的那样。这包括 LLM、Retriever、Tool 等的所有内部运行。

输出作为 Log 对象流式传输,其中包括 Jsonpatch ops 列表,这些操作描述了运行状态在每个步骤中如何更改,以及运行的最终状态。

可以按顺序应用 Jsonpatch ops 来构建状态。

参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • diff (bool) – 是否产生每个步骤之间的差异或当前状态。

  • with_streamed_output_list (bool) – 是否产生 streamed_output 列表。

  • include_names (Optional[Sequence[str]]) – 仅包含具有这些名称的日志。

  • include_types (Optional[Sequence[str]]) – 仅包含具有这些类型的日志。

  • include_tags (Optional[Sequence[str]]) – 仅包含具有这些标签的日志。

  • exclude_names (Optional[Sequence[str]]) – 排除具有这些名称的日志。

  • exclude_types (Optional[Sequence[str]]) – 排除具有这些类型的日志。

  • exclude_tags (Optional[Sequence[str]]) – 排除具有这些标签的日志。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。

产生

RunLogPatch 或 RunLog 对象。

返回类型

Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]

async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在仍在生成输入时开始生成输出,则应重写此方法。

参数
  • input (AsyncIterator[Input]) – Runnable 的输入异步迭代器。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

batch 的默认实现非常适合 IO 绑定 runnable。

如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

在一系列输入上并行运行 invoke,并在完成时产生结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

bind(**kwargs: Any) Runnable[Input, Output]

将参数绑定到 Runnable,返回一个新的 Runnable。

当链中的 Runnable 需要一个参数,而该参数不在前一个 Runnable 的输出中或未包含在用户输入中时,此方法很有用。

参数

kwargs (Any) – 要绑定到 Runnable 的参数。

返回

绑定了参数的新 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# Without bind.
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind.
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
bind_tools(tools: ~typing.Sequence[~typing.Union[~typing.Dict[str, ~typing.Any], ~typing.Type[~pydantic.main.BaseModel], ~typing.Callable, ~langchain_core.tools.BaseTool]], tool_arg: str = 'tools', conversion_fn: ~typing.Callable = <function convert_to_openai_tool>, **kwargs: ~typing.Any) Runnable[Union[PromptValue, str, Sequence[Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]]], BaseMessage][source]

将类似工具的对象绑定到此聊天模型。

假定模型与 OpenAI 工具调用 API 兼容。

参数
  • tools (Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]]) – 要绑定到此聊天模型的工具定义列表。可以是字典、pydantic 模型、可调用对象或 BaseTool。Pydantic 模型、可调用对象和 BaseTool 将自动转换为其模式字典表示形式。

  • **kwargs (Any) – 要传递给 Runnable 构造函数的任何其他参数。

  • tool_arg (str) –

  • conversion_fn (Callable) –

  • **kwargs

返回类型

Runnable[Union[PromptValue, str, Sequence[Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]]]], BaseMessage]

实验性功能:此方法旨在为未来提供支持。在类中调用: ``` class TooledChatNVIDIA(ChatNVIDIA, ToolsMixin)

pass

llm = TooledChatNVIDIA(model=”mixtral_8x7b”) tooled_llm = llm.bind_tools(tools) tooled_llm.invoke(“Hello world!!”) ```

``` from langchain_nvidia_ai_endpoints import ChatNVIDIA, ServerToolsMixin from langchain_core.pydantic_v1 import BaseModel, Field

# 请注意,此处的文档字符串至关重要,因为它们将与类名一起传递给模型。 class Multiply(BaseModel)

“将两个整数相乘。” a: int = Field(…, description=”第一个整数”) b: int = Field(…, description=”第二个整数”)

class TooledChatNVIDIA(ServerToolsMixin, ChatNVIDIA)

pass

llm = TooledChatNVIDIA().mode(“openai”, model=”gpt-3.5-turbo-0125”) llm.bind_tools([Multiply]).invoke(“请为我相乘?”) llm.client.last_response.json() ```

有关更多文档,请参阅 langchain-mistralal/openai 的实现。

config_schema(*, include: Optional[Sequence[str]] = None) Type[BaseModel]

此 Runnable 接受的配置类型,指定为 pydantic 模型。

要将字段标记为可配置,请参阅 configurable_fieldsconfigurable_alternatives 方法。

参数

include (Optional[Sequence[str]]) – 要包含在配置模式中的字段列表。

返回

可用于验证配置的 pydantic 模型。

返回类型

Type[BaseModel]

get_graph(config: Optional[RunnableConfig] = None) Graph

返回此 Runnable 的图形表示。

参数

config (Optional[RunnableConfig]) –

返回类型

Graph

get_input_schema(config: Optional[RunnableConfig] = None) Type[BaseModel]

获取可用于验证 Runnable 输入的 pydantic 模型。

利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输入模式,该模式取决于调用 Runnable 时使用的配置。

此方法允许获取特定配置的输入模式。

参数

config (Optional[RunnableConfig]) – 生成模式时要使用的配置。

返回

可用于验证输入的 pydantic 模型。

返回类型

Type[BaseModel]

get_name(suffix: Optional[str] = None, *, name: Optional[str] = None) str

获取 Runnable 的名称。

参数
  • suffix (Optional[str]) –

  • name (Optional[str]) –

返回类型

str

get_output_schema(config: Optional[RunnableConfig] = None) Type[BaseModel]

获取可用于验证 Runnable 输出的 pydantic 模型。

利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输出模式,该模式取决于调用 Runnable 时使用的配置。

此方法允许获取特定配置的输出模式。

参数

config (Optional[RunnableConfig]) – 生成模式时要使用的配置。

返回

可用于验证输出的 pydantic 模型。

返回类型

Type[BaseModel]

get_prompts(config: Optional[RunnableConfig] = None) List[BasePromptTemplate]

返回由此 Runnable 使用的提示列表。

参数

config (Optional[RunnableConfig]) –

返回类型

List[BasePromptTemplate]

abstract invoke(input: Input, config: Optional[RunnableConfig] = None) Output

将单个输入转换为输出。覆盖此方法以实现。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行量的“max_concurrency”以及其他键。请参阅 RunnableConfig 以获取更多详细信息。

返回

Runnable 的输出。

返回类型

Output

map() Runnable[List[Input], List[Output]]

返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。

返回

一个新的 Runnable,它将输入列表映射到输出列表。

返回类型

Runnable[List[Input], List[Output]]

示例

from langchain_core.runnables import RunnableLambda

def _lambda(x: int) -> int:
    return x + 1

runnable = RunnableLambda(_lambda)
print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
pick(keys: Union[str, List[str]]) RunnableSerializable[Any, Any]

从此 Runnable 的字典输出中选取键。

选择单个键
import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}

json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any

import json

from langchain_core.runnables import RunnableLambda, RunnableMap

as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
    return bytes(x, "utf-8")

chain = RunnableMap(
    str=as_str,
    json=as_json,
    bytes=RunnableLambda(as_bytes)
)

chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}

json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
参数

keys (Union[str, List[str]]) –

返回类型

RunnableSerializable[Any, Any]

pipe(*others: Union[Runnable[Any, Other], Callable[[Any], Other]], name: Optional[str] = None) RunnableSerializable[Input, Other]

将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。

等效于 RunnableSequence(self, *others)self | others[0] | …

示例

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4

sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
参数
  • others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –

  • name (Optional[str]) –

返回类型

RunnableSerializable[Input, Other]

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

流式传输的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

transform(input: Iterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

转换的默认实现,它缓冲输入,然后调用 stream。如果子类可以在仍在生成输入时开始生成输出,则应覆盖此方法。

参数
  • input (Iterator[Input]) – Runnable 的输入迭代器。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

with_alisteners(*, on_start: Optional[AsyncListener] = None, on_end: Optional[AsyncListener] = None, on_error: Optional[AsyncListener] = None) Runnable[Input, Output]

将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start:在 Runnable 开始运行之前异步调用。 on_end:在 Runnable 完成运行后异步调用。 on_error:如果 Runnable 抛出错误,则异步调用。

Run 对象包含有关运行的信息,包括其 ID、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。

参数
  • on_start (Optional[AsyncListener]) – 在 Runnable 开始运行之前异步调用。默认为 None。

  • on_end (Optional[AsyncListener]) – 在 Runnable 完成运行后异步调用。默认为 None。

  • on_error (Optional[AsyncListener]) – 如果 Runnable 抛出错误,则异步调用。默认为 None。

返回

绑定侦听器的新 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda
import time

async def test_runnable(time_to_sleep : int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj : Runnable):
    print(f"on start callback starts at {format_t(time.time())}
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj : Runnable):
    print(f"on end callback starts at {format_t(time.time())}
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2024-05-16T14:20:29.637053+00:00
on start callback starts at 2024-05-16T14:20:29.637150+00:00
on start callback ends at 2024-05-16T14:20:32.638305+00:00
on start callback ends at 2024-05-16T14:20:32.638383+00:00
Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00
Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00
Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00
on end callback starts at 2024-05-16T14:20:35.640534+00:00
Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00
on end callback starts at 2024-05-16T14:20:37.640574+00:00
on end callback ends at 2024-05-16T14:20:37.640654+00:00
on end callback ends at 2024-05-16T14:20:39.641751+00:00
with_config(config: Optional[RunnableConfig] = None, **kwargs: Any) Runnable[Input, Output]

将配置绑定到 Runnable,返回一个新的 Runnable。

参数
  • config (Optional[RunnableConfig]) – 要绑定到 Runnable 的配置。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。

返回

绑定配置的新 Runnable。

返回类型

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output]

向 Runnable 添加回退,返回一个新的 Runnable。

新的 Runnable 将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。

参数
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的一系列 runnable。

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。默认为 (Exception,)。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,键为指定的键。如果为 None,则异常将不会传递给回退。如果使用,则基本 Runnable 及其回退必须接受字典作为输入。默认为 None。

返回

一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。

返回类型

RunnableWithFallbacksT[Input, Output]

示例

from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
    )
print(''.join(runnable.stream({}))) #foo bar
参数
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试运行的一系列 runnable。

  • exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,键为指定的键。如果为 None,则异常将不会传递给回退。如果使用,则基本 Runnable 及其回退必须接受字典作为输入。

返回

一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。

返回类型

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_end: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None, on_error: Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]] = None) Runnable[Input, Output]

将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。

on_start:在 Runnable 开始运行之前调用,使用 Run 对象。 on_end:在 Runnable 完成运行后调用,使用 Run 对象。 on_error:如果 Runnable 抛出错误,则调用,使用 Run 对象。

Run 对象包含有关运行的信息,包括其 ID、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。

参数
  • on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 开始运行之前调用。默认为 None。

  • on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 完成运行后调用。默认为 None。

  • on_error (可选[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 当 Runnable 抛出错误时调用。默认为 None。

返回

绑定侦听器的新 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)
with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output]

创建一个新的 Runnable,它在出现异常时重试原始 Runnable。

参数
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) – 用于重试的异常类型元组。默认为 (Exception,)。

  • wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。

  • stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为 3。

返回

一个新的 Runnable,它在发生异常时重试原始的 Runnable。

返回类型

Runnable[Input, Output]

示例

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)
参数
  • retry_if_exception_type (Tuple[Type[BaseException], ...]) – 用于重试的异常类型元组

  • wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动

  • stop_after_attempt (int) – 在放弃之前尝试的最大次数

返回

一个新的 Runnable,它在发生异常时重试原始的 Runnable。

返回类型

Runnable[Input, Output]

with_structured_output(schema: ~typing.Union[~typing.Dict, ~typing.Type[~pydantic.main.BaseModel]], *, include_raw: bool = False, tool_arg: str = 'tools', conversion_fn: ~typing.Callable = <function convert_to_openai_tool>, **kwargs: ~typing.Any) Runnable[Union[PromptValue, str, Sequence[Union[BaseMessage, List[str], Tuple[str, str], str, Dict[str, Any]]]], Union[Dict, BaseModel]][source]

模型包装器,返回格式化为匹配给定模式的输出。

参数
  • schema (Union[Dict, Type[BaseModel]]) – 输出 schema,可以是 dict 或 Pydantic 类。如果是 Pydantic 类,则模型输出将是该类的对象。如果是 dict,则模型输出将是 dict。使用 Pydantic 类,返回的属性将被验证,而使用 dict 则不会。如果 method 是 “function_calling” 并且 schema 是 dict,则该 dict 必须符合 OpenAI 函数调用的规范。

  • include_raw (bool) – 如果为 False,则仅返回解析后的结构化输出。如果在模型输出解析期间发生错误,则会引发错误。如果为 True,则将返回原始模型响应 (BaseMessage) 和解析后的模型响应。如果在输出解析期间发生错误,它将被捕获并同样返回。最终输出始终是一个包含键 “raw”、“parsed” 和 “parsing_error” 的 dict。

  • tool_arg (str) –

  • conversion_fn (Callable) –

  • kwargs (Any) –

返回

如果 include_raw 为 True,则返回一个包含以下键的 dict

raw: BaseMessage parsed: Pydantic BaseModel 或 Dictionary parsing_error: Optional[BaseException]

如果 include_raw 为 False,则仅返回 BaseModel/Dictionary(取决于 schema 类型)。

返回类型

一个 Runnable,它接受任何 ChatModel 输入并返回作为输出

实验性功能:此方法旨在为未来提供支持。在类中调用: ``` class TooledChatNVIDIA(ChatNVIDIA, ToolsMixin)

pass

```

有关更多文档,请参阅 langchain-mistralal/openai 的实现。

with_types(*, input_type: Optional[Type[Input]] = None, output_type: Optional[Type[Output]] = None) Runnable[Input, Output]

将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。

参数
  • input_type (可选[Type[Input]]) – 要绑定到 Runnable 的输入类型。默认为 None。

  • output_type (可选[Type[Output]]) – 要绑定到 Runnable 的输出类型。默认为 None。

返回

一个新的 Runnable,其类型已绑定。

返回类型

Runnable[Input, Output]