langchain_core.output_parsers.openai_functions
.PydanticOutputFunctionsParser¶
注意
PydanticOutputFunctionsParser 实现了标准的 Runnable 接口
。 🏃
Runnable 接口
具有在 runnables 上可用的附加方法,例如 with_types
, with_retry
, assign
, bind
, get_graph
, 以及更多。
- class langchain_core.output_parsers.openai_functions.PydanticOutputFunctionsParser[source]¶
-
将输出解析为 pydantic 对象。
此解析器用于解析 ChatModel 的输出,该 ChatModel 使用 OpenAI 函数格式来调用函数。
解析器提取函数调用调用,并将它们与提供的 pydantic 模式匹配。
如果函数调用与提供的模式不匹配,将引发异常。
示例
… code-block:: python
- message = AIMessage(
content=”这是一个测试消息”, additional_kwargs={
- “function_call”: {
“name”: “cookie”, “arguments”: json.dumps({“name”: “value”, “age”: 10}),
}
},
) chat_generation = ChatGeneration(message=message)
- class Cookie(BaseModel)
name: str age: int
- class Dog(BaseModel)
species: str
# 完整输出解析器 = PydanticOutputFunctionsParser(
pydantic_schema={“cookie”: Cookie, “dog”: Dog}
) result = parser.parse_result([chat_generation])
- param args_only: bool = True¶
是否仅返回函数调用的参数。
- param pydantic_schema: Union[Type[BaseModel], Dict[str, Type[BaseModel]]] [必需]¶
用于解析输出的 pydantic 模式。
如果提供了多个模式,则将使用函数名称来确定要使用的模式。
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 绑定的 runnables 效果良好。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,如 ‘tags’、‘metadata’ 用于跟踪目的,‘max_concurrency’ 用于控制并行执行的工作量,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 返回
来自 Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行列表中输入的 ainvoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,如 ‘tags’、‘metadata’ 用于跟踪目的,‘max_concurrency’ 用于控制并行执行的工作量,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
输入索引和来自 Runnable 的输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Union[str, BaseMessage], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) T ¶
ainvoke 的默认实现,从线程调用 invoke。
即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。
如果子类可以异步运行,则应覆盖此方法。
- 参数
input (Union[str, BaseMessage]) –
config (Optional[RunnableConfig]) –
kwargs (Optional[Any]) –
- 返回类型
T
- async aparse_result(result: List[Generation], *, partial: bool = False) T ¶
异步解析候选模型 Generations 列表为特定格式。
- 参数
result (List[Generation]) – 要解析的 Generations 列表。 Generations 假定为单个模型输入的不同候选输出。
partial (bool) – 是否将输出解析为部分结果。 这对于可以解析部分结果的解析器很有用。 默认为 False。
- 返回
结构化输出。
- 返回类型
T
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 beta 阶段,将来可能会发生更改。
从 Runnable 创建 BaseTool。
as_tool
将从 Runnable 实例化一个带有名称、描述和args_schema
的 BaseTool。如果可能,模式会从runnable.get_input_schema
推断。或者(例如,如果 Runnable 接受 dict 作为输入,并且未键入特定的 dict 键),则可以直接使用args_schema
指定模式。您也可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本新增。
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 beta 阶段,将来可能会发生更改。
生成事件流。
用于创建 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个字典,具有以下模式
event
: str - 事件名称的格式为:格式:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - randomly generated ID associated with the given execution ofRunnable,该 ID 与给定的 Runnable 执行实例相关联,由系统随机生成。
- the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.
Runnable 发出的事件。作为父级 Runnable 执行的一部分而被调用的子级 Runnable 将被分配其自己的唯一 ID。
parent_ids
: List[str] - The IDs of the parent runnables thatparent_ids
: List[str] - 生成事件的父级 Runnables 的 ID 列表。
- generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
生成事件。根级 Runnable 将拥有一个空列表。父级 ID 的顺序是从根级到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。
tags
: Optional[List[str]] - The tags of the Runnable that generated
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。the event.
事件。
metadata
: Optional[Dict[str, Any]] - The metadata of the Runnablemetadata
: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。that generated the event.
生成事件。
data
: Dict[str, Any]data
: Dict[str, Any]Below is a table that illustrates some evens that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
下面是一个表格,展示了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链的定义已包含在表格之后。
data
: Dict[str, Any]ATTENTION This reference table is for the V2 version of the schema.
注意 此参考表适用于 schema 的 V2 版本。
data
: Dict[str, Any]Below is a table that illustrates some evens that might be emitted by various chains. Metadata fields have been omitted from the table for brevity. Chain definitions have been included after the table.
event
事件
data
: Dict[str, Any]name
名称
data
: Dict[str, Any]chunk
块
data
: Dict[str, Any]input
输入
output
输出
output
on_chat_model_start
on_chat_model_start
output
[model name]
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
{“messages”: [[SystemMessage, HumanMessage]]}
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
AIMessageChunk(content=”hello”)
AIMessageChunk(content=”hello”)
on_chat_model_end
AIMessageChunk(content=”hello”)
AIMessageChunk(content=”hello”)
on_chat_model_end
AIMessageChunk(content=”hello world”)
AIMessageChunk(content=”hello world”)
on_llm_start
on_llm_start
AIMessageChunk(content=”hello world”)
on_llm_start
{‘input’: ‘hello’}
{‘input’: ‘hello’}
on_llm_stream
on_llm_stream
‘Hello’
‘Hello’
on_llm_end
metadata
: Optional[Dict[str, Any]] - The metadata of the Runnableon_llm_end
‘Hello human!’
‘Hello human!’
on_chain_start
on_chain_start
format_docs
output:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
{“messages”: [[SystemMessage, HumanMessage]]}:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
format_docs:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
on_chain_stream
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
on_chain_stream
“hello world!, goodbye world!”
“hello world!, goodbye world!”
on_chain_end
on_chain_end
[Document(…)]
[Document(…)]
on_tool_start
on_tool_start
some_tool
- 产生
some_tool
- {“x”: 1, “y”: “2”}
{“x”: 1, “y”: “2”}
- 返回类型
on_tool_end
- on_tool_end
on_retriever_start
batch 的默认实现对于 IO 绑定的 runnables 效果良好。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
on_retriever_start
[retriever name]
[retriever 名称]
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- {“query”: “hello”}
{“query”: “hello”}
- 参数
on_retriever_end
on_retriever_end
[retriever 名称]
kwargs (Optional[Any]) –
- 返回类型
[Document(…), ..]
- [Document(…), ..]
on_prompt_start
- 参数
on_prompt_start
[template_name]
[template_name]
{“question”: “hello”}
- 返回
{“question”: “hello”}
- 返回类型
on_prompt_end
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- on_prompt_end
ChatPromptValue(messages: [SystemMessage, …])
- 参数
ChatPromptValue(messages: [SystemMessage, …])
- 返回
In addition to the standard events, users can also dispatch custom events (see example below).
- 返回类型
on_prompt_end
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- 除了标准事件之外,用户还可以分派自定义事件(请参阅下面的示例)。
Custom events will be only be surfaced with in the v2 version of the API!
- 参数
自定义事件将仅在 API 的 v2 版本中显示!
A custom event has following format
- 返回
Runnable 的输出。
- 返回类型
T
- 自定义事件具有以下格式
Attribute
- 参数
属性
Type
- 返回
类型
- 返回类型
on_chain_start
- Description
描述
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
str
- str
A user defined name for the event.
- 返回
用户为事件定义的名称。
- 返回类型
data