langchain_experimental.tot.prompts
.CheckerOutputParser¶
注意
CheckerOutputParser实现了标准的Runnable 接口
。🏃
Runnable 接口
提供了额外的在runnables中可用的方法,例如with_types
、with_retry
、assign
、bind
、get_graph
等更多方法。
- class langchain_experimental.tot.prompts.CheckerOutputParser[source]¶
基类:
BaseOutputParser
解析并检查语言模型的输出。
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 run。
默认的批处理实现对于IO密集型的runnables运行良好。
如果子类能够更有效地批量处理,则应重写此方法;例如,如果底层Runnable使用支持批处理模式的API。
- 参数
inputs (列表[输入]) – Runnable的输入列表。
config (可选[联合[RunnableConfig, 列表[RunnableConfig]]]) – 调用Runnable时要使用的配置。配置支持用于跟踪的标准键,如“标签”、“元数据”,用于控制并行工作的量“max_concurrency”,以及其他键。请参阅RunnableConfig获取更多详细信息。默认为None。
return_exceptions (布尔值) – 是否返回异常而不是抛出它们。默认为False。
kwargs (可选[任何]) – 传递给Runnable的额外关键字参数。
- 返回值
Runnable的输出列表。
- 返回类型
列表[输出]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]¶
在输入列表上并行运行 ripping,在操作完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。配置支持标准的键,如‘tags’、‘metadata’用于跟踪目的,‘max_concurrency’用于控制并行执行的工作量,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认值为 None。
return_exceptions (布尔值) – 是否返回异常而不是抛出它们。默认为False。
kwargs (可选[任何]) – 传递给Runnable的额外关键字参数。
- 产生
一个包含输入索引和 Runnable 的输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Union[str, BaseMessage], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) T
ainvoke 的默认实现,从线程中调用 invoke。
默认实现允许即使 Runnable 没有实现 invoke 的本地异步版本,也可以使用异步代码。
如果子类可以异步运行,则应重写此方法。
- 参数
input (Union[str, BaseMessage]) –
config (Optional[RunnableConfig]) –
kwargs (Optional[Any]) –
- 返回类型
T
- async aparse(text: str) T
异步将单个字符串模型输出解析为某种结构。
- 参数
text (str) – 语言模型的字符串输出。
- 返回值
结构化输出。
- 返回类型
T
- async aparse_result(result: List[Generation], *, partial: bool = False) T
异步将候选模型生成物列表解析为特定格式。
- 返回值仅从结果中的第一个生成物解析,该生成物
假定是最可能的生成物。
- 参数
结果 (列表[生成]) – 要解析的生成列表。假设这些生成是单个模型输入的不同候选输出。
部分 (布尔型) – 是否将输出解析为部分结果。对于可以解析部分结果的解析器很有用。默认为False。
- 返回值
结构化输出。
- 返回类型
T
- as_tool(args_schema: Optional[Type[BaseModel]], *\span>, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]]) BaseTool¶
beta版本
此API处于beta版本,未来可能会有变化。
从可运行对象创建BaseTool。
as_tool
将从可运行对象实例化一个包含名称、描述和args_schema
的BaseTool。在可能的情况下,架构从runnable.get_input_schema
推断出。或者(例如,如果可运行对象以字典作为输入,且特定字典键未指定类型),可以通过args_schema
直接指定架构。您还可以通过传递arg_types
来仅指定所需的参数及其类型。- 参数
args_schema (可选[Type[BaseModel]]) – 工具的架构。默认为None。
name (可选[字符串]) – 工具的名称。默认为None。
description (可选[字符串]) – 工具的描述。默认为None。
arg_types (可选[字典[字符串,Type]]) – 参数名称到类型的字典。默认为None。
- 返回值
一个BaseTool实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定架构from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定架构from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
自版本 0.2.14 新增。
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]¶
astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应该重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (可选[]RunnableConfig[]) – 用于 Runnable 的配置。默认为 None。
kwargs (可选[任何]) – 传递给Runnable的额外关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
beta版本
此API处于beta版本,未来可能会有变化。
生成事件流。
用于创建一个对 StreamEvents 迭代器,它提供有关 Runnable 进展的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个具有以下架构的字典
event
: str - 事件名为格式: on_[runnable_type]_(start|stream|end).
name
: str - 产生事件的 Runnable 的名称。run_id
: str - 与给定执行相关的随机生成的 ID,该 ID 与发出事件的 Runnable 链接。作为父级 Runnable 执行一部分的子级 Runnable 将分配其自己的唯一 ID。
parent_ids
: 列表[str] - 产生事件的父级 Runnable 的 ID。根级 Runnable 将有一个空列表。父级 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。
tags
: 可选[str] – 产生事件的 Runnable 的标签。标签。
metadata
: 可选[dict] – 产生事件的 Runnable 的元数据。元数据。
data
: 字典[str, Any]
以下是一个表格,说明了各种链可能发出的一些事件。为了简明,省略了元数据字段。链定义在表格之后。
注意:此参考表是为架构的 V2 版本。
事件
名称
块
输入
输出
on_chat_model_start
[
{“messages”: [[系统消息,人类消息]]}
on_chat_model_stream
[
AIMessageChunk(content="hello")
on_chat_model_end
[
{“messages”: [[系统消息,人类消息]]}
AIMessageChunk(content="hello world")
on_llm_start
[
{‘input’: ‘hello’}
on_llm_stream
[
‘Hello’
on_llm_end
[
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[文档(…)]
“hello world!, goodbye world!”
on_tool_start
某些工具
{“x”: 1, “y”: “2”}
on_tool_end
某些工具
{“x”: 1, “y”: “2”}
on_retriever_start
[
{“query”: “hello”}
on_retriever_end
[
{“query”: “hello”}
[
on_prompt_start
[
{“question”: “hello”}
on_prompt_end
[
{“question”: “hello”}
ChatPromptValue(messages: [系统消息, …])
除了标准事件外,用户还可以调度自定义事件(如下所述示例)。
自定义事件仅在API的v2版本中可见!
自定义事件的格式如下
属性
类型
描述
名称
字符串
事件的用户定义名称。
数据
任何
与事件关联的数据。这可能是一切,但我们建议将其制作成可序列化的JSON。
以下是与上述标准事件相关的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
某些工具:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:调度自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (任何) – Runnable的输入。
config (可选[RunnableConfig]) – 用于Runnable的配置。
version (文字['v1', 'v2']) – 要使用的模式版本,可以是v2或v1。用户应使用v2。 v1是为向后兼容而提供的,并在0.4.0中将被弃用。在没有API稳定之前不会分配默认值。自定义事件仅在v2中可见。
include_names (可选[序列[字符串]]) – 只包括具有匹配名称的runnables的事件。
include_types (可选[序列[字符串]]) – 只包括具有匹配类型的runnables的事件。
include_tags (可选[序列[字符串]]) – 只包括具有匹配标记的runnables的事件。
exclude_names (可选[序列[字符串]]) – 排除具有匹配名称的runnables的事件。
exclude_types (可选[序列[字符串]]) – 排除具有匹配类型的runnables的事件。
exclude_tags (可选[序列[字符串]]) – 排除具有匹配标记的runnables的事件。
kwargs (任何) – 传递给Runnable的额外关键字参数。这些将通过astream_log传递,因为astream_events的实现基于astream_log。
- 产生
异步流中的StreamEvents。
- 抛出
NotImplementedError – 如果版本不是v1或v2。
- 返回类型
AsyncIterator[联合[StandardStreamEvent, CustomStreamEvent]]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]], *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现是在线程池执行器中并行调用。
默认的批处理实现对于IO密集型的runnables运行良好。
如果子类能够更有效地批量处理,则应重写此方法;例如,如果底层Runnable使用支持批处理模式的API。
- 参数
inputs (列表[输入]) –
config (可选[联合[RunnableConfig, 列表[RunnableConfig]]]) –
return_exceptions (布尔值) –
kwargs (Optional[Any]) –
- 返回类型
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
在输入列表上并行运行 invoke,并根据完成情况产生结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (布尔值) –
kwargs (Optional[Any]) –
- 返回类型
迭代器[元组[int, Union[Output, Exception]]]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
配置可在运行时设置的 Runnables 的替代选项。
- 参数
which (ConfigurableField) – 要用于选择替代方案的 ConfigurableField 实例。
default_key (str) – 如果没有选择替代方案,则使用默认键。默认为“default”。
prefix_keys (bool) – 是否用 ConfigurableField id 前缀键。默认为 False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – Runnable 实例或返回 Runnable 实例的可调用对象的关键字字典。
- 返回值
配置了替代方案的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]
在运行时配置特定的可运行字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的ConfigurableField实例的字典。
- 返回值
具有配置字段的新的可运行对象。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- get_format_instructions() str
指定LLM输出应该如何格式化的说明。
- 返回类型
字符串
- invoke(input: Union[str, BaseMessage], config: Optional[RunnableConfig] = None) T
将单个输入转换成输出。通过覆盖来实现。
- 参数
input (Union[str, BaseMessage]) – 可运行的输入。
config (可选[RunnableConfig]) – 调用Runnable时要使用的配置。该配置支持标准键如‘tags’、‘metadata’用于跟踪目的,以及‘max_concurrency’用于控制并行处理的工作量,以及其他键。请参阅RunnableConfig以获取更多详情。
- 返回值
Runnable 的输出。
- 返回类型
T
- parse(text: str) ThoughtValidity [source]¶
解析语言模型的输出。
- 参数
text (str) –
- 返回类型
- parse_result(result: List[Generation], *, partial: bool = False) T¶
将一系列候选模型生成器结果解析成特定格式。
- 返回值仅从结果中的第一个生成物解析,该生成物
假定是最可能的生成物。
- 参数
结果 (列表[生成]) – 要解析的生成列表。假设这些生成是单个模型输入的不同候选输出。
部分 (布尔型) – 是否将输出解析为部分结果。对于可以解析部分结果的解析器很有用。默认为False。
- 返回值
结构化输出。
- 返回类型
T
- parse_with_prompt(completion: str, prompt: PromptValue) Any¶
使用上下文输入提示解析LLM的输出。
此提示通常由OutputParser提供,以便重试或以某种方式修复输出,并需要从提示中获取信息以执行此操作。
- 参数
completion (str) – 语言模型的字符串输出。
prompt (PromptValue) – 输入PromptValue。
- 返回值
结构化输出。
- 返回类型
任何
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]¶
stream 的默认实现,调用 invoke。子类应覆盖此方法以支持流式输出。
- 参数
input (Input) – Runnable 的输入。
config (可选[]RunnableConfig[]) – 用于 Runnable 的配置。默认为 None。
kwargs (可选[任何]) – 传递给Runnable的额外关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将可运行对象序列化为 JSON。
- 返回值
可序列化为 JSON 的可运行对象表示。
- 返回类型