langchain_core.output_parsers.openai_functions.PydanticOutputFunctionsParser

注意

PydanticOutputFunctionsParser 实现了标准的 Runnable 接口。 🏃

Runnable 接口 具有在 runnables 上可用的附加方法,例如 with_types, with_retry, assign, bind, get_graph, 以及更多。

class langchain_core.output_parsers.openai_functions.PydanticOutputFunctionsParser[source]

基类: OutputFunctionsParser

将输出解析为 pydantic 对象。

此解析器用于解析 ChatModel 的输出,该 ChatModel 使用 OpenAI 函数格式来调用函数。

解析器提取函数调用调用,并将它们与提供的 pydantic 模式匹配。

如果函数调用与提供的模式不匹配,将引发异常。

示例

… code-block:: python

message = AIMessage(

content=”这是一个测试消息”, additional_kwargs={

“function_call”: {

“name”: “cookie”, “arguments”: json.dumps({“name”: “value”, “age”: 10}),

}

},

) chat_generation = ChatGeneration(message=message)

class Cookie(BaseModel)

name: str age: int

class Dog(BaseModel)

species: str

# 完整输出解析器 = PydanticOutputFunctionsParser(

pydantic_schema={“cookie”: Cookie, “dog”: Dog}

) result = parser.parse_result([chat_generation])

param args_only: bool = True

是否仅返回函数调用的参数。

param pydantic_schema: Union[Type[BaseModel], Dict[str, Type[BaseModel]]] [必需]

用于解析输出的 pydantic 模式。

如果提供了多个模式,则将使用函数名称来确定要使用的模式。

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现对于 IO 绑定的 runnables 效果良好。

如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,如 ‘tags’、‘metadata’ 用于跟踪目的,‘max_concurrency’ 用于控制并行执行的工作量,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

返回

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

并行运行列表中输入的 ainvoke,并在完成时产生结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,如 ‘tags’、‘metadata’ 用于跟踪目的,‘max_concurrency’ 用于控制并行执行的工作量,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

输入索引和来自 Runnable 的输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Union[str, BaseMessage], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) T

ainvoke 的默认实现,从线程调用 invoke。

即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。

如果子类可以异步运行,则应覆盖此方法。

参数
返回类型

T

async aparse_result(result: List[Generation], *, partial: bool = False) T

异步解析候选模型 Generations 列表为特定格式。

参数
  • result (List[Generation]) – 要解析的 Generations 列表。 Generations 假定为单个模型输入的不同候选输出。

  • partial (bool) – 是否将输出解析为部分结果。 这对于可以解析部分结果的解析器很有用。 默认为 False。

返回

结构化输出。

返回类型

T

as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 beta 阶段,将来可能会发生更改。

从 Runnable 创建 BaseTool。

as_tool 将从 Runnable 实例化一个带有名称、描述和 args_schema 的 BaseTool。如果可能,模式会从 runnable.get_input_schema 推断。或者(例如,如果 Runnable 接受 dict 作为输入,并且未键入特定的 dict 键),则可以直接使用 args_schema 指定模式。您也可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。

  • name (Optional[str]) – 工具的名称。默认为 None。

  • description (Optional[str]) – 工具的描述。默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。

返回

BaseTool 实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本新增。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 beta 阶段,将来可能会发生更改。

生成事件流。

用于创建 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个字典,具有以下模式

  • event: str - 事件名称的格式为:

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - randomly generated ID associated with the given execution of

    Runnable,该 ID 与给定的 Runnable 执行实例相关联,由系统随机生成。

  • the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.

    Runnable 发出的事件。作为父级 Runnable 执行的一部分而被调用的子级 Runnable 将被分配其自己的唯一 ID。

  • parent_ids: List[str] - The IDs of the parent runnables that

    parent_ids: List[str] - 生成事件的父级 Runnables 的 ID 列表。

  • generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

    生成事件。根级 Runnable 将拥有一个空列表。父级 ID 的顺序是从根级到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

  • tags: Optional[List[str]] - The tags of the Runnable that generated

tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

the event.

事件。

metadata: Optional[Dict[str, Any]] - The metadata of the Runnable

metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。

that generated the event.

生成事件。

data: Dict[str, Any]

data: Dict[str, Any]

Below is a table that illustrates some evens that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.

下面是一个表格,展示了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链的定义已包含在表格之后。

data: Dict[str, Any]

ATTENTION This reference table is for the V2 version of the schema.

注意 此参考表适用于 schema 的 V2 版本。

data: Dict[str, Any]

Below is a table that illustrates some evens that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.

event

事件

data: Dict[str, Any]

name

名称

data: Dict[str, Any]

chunk

data: Dict[str, Any]

input

输入

output

输出

output

on_chat_model_start

on_chat_model_start

output

[model name]

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

{“messages”: [[SystemMessage, HumanMessage]]}

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

AIMessageChunk(content=”hello”)

AIMessageChunk(content=”hello”)

on_chat_model_end

AIMessageChunk(content=”hello”)

AIMessageChunk(content=”hello”)

on_chat_model_end

AIMessageChunk(content=”hello world”)

AIMessageChunk(content=”hello world”)

on_llm_start

on_llm_start

AIMessageChunk(content=”hello world”)

on_llm_start

{‘input’: ‘hello’}

{‘input’: ‘hello’}

on_llm_stream

on_llm_stream

‘Hello’

‘Hello’

on_llm_end

metadata: Optional[Dict[str, Any]] - The metadata of the Runnable

on_llm_end

‘Hello human!’

‘Hello human!’

on_chain_start

on_chain_start

format_docs

output:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

{“messages”: [[SystemMessage, HumanMessage]]}:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

format_docs:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

on_chain_stream

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • on_chain_stream

  • “hello world!, goodbye world!”

  • “hello world!, goodbye world!”

  • on_chain_end

  • on_chain_end

  • [Document(…)]

  • [Document(…)]

  • on_tool_start

  • on_tool_start

  • some_tool

产生

some_tool

{“x”: 1, “y”: “2”}

{“x”: 1, “y”: “2”}

返回类型

on_tool_end

on_tool_end

on_retriever_start

batch 的默认实现对于 IO 绑定的 runnables 效果良好。

如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。

参数
  • on_retriever_start

  • [retriever name]

  • [retriever 名称]

  • kwargs (Optional[Any]) –

返回类型

List[Output]

{“query”: “hello”}

{“query”: “hello”}

参数
  • on_retriever_end

  • on_retriever_end

  • [retriever 名称]

  • kwargs (Optional[Any]) –

返回类型

[Document(…), ..]

[Document(…), ..]

on_prompt_start

参数
  • on_prompt_start

  • [template_name]

  • [template_name]

  • {“question”: “hello”}

返回

{“question”: “hello”}

返回类型

on_prompt_end

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
on_prompt_end

ChatPromptValue(messages: [SystemMessage, …])

参数

ChatPromptValue(messages: [SystemMessage, …])

返回

In addition to the standard events, users can also dispatch custom events (see example below).

返回类型

on_prompt_end

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
除了标准事件之外,用户还可以分派自定义事件(请参阅下面的示例)。

Custom events will be only be surfaced with in the v2 version of the API!

参数
  • 自定义事件将仅在 API 的 v2 版本中显示!

  • A custom event has following format

返回

Runnable 的输出。

返回类型

T

自定义事件具有以下格式

Attribute

参数
  • 属性

  • Type

返回

类型

返回类型

on_chain_start

Description

描述

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

str

str

A user defined name for the event.

返回

用户为事件定义的名称。

返回类型

data

data