langchain_core.output_parsers.transform.BaseCumulativeTransformOutputParser

注意:

BaseCumulativeTransformOutputParser实现了标准的Runnable Interface。🏃

Runnable接口增加了runnables特有的方法,例如with_typeswith_retryassignbindget_graph等。

class langchain_core.output_parsers.transform.BaseCumulativeTransformOutputParser[source]

继承自:BaseTransformOutputParser[T]

BaseCumulativeTransformOutputParser是一个输出解析器的基类,它可以处理流式输入。

参数 diff : bool = False

在流模式下,是否生成上一次和当前解析输出的差异,或者只生成当前解析输出。

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现通过使用asyncio.gather并行地运行ainvoke。

默认批处理实现适用于IO密集型可运行程序。

子类应在其能更有效地批处理的情况下重写此方法;例如,如果底层可运行程序使用的API支持批处理模式。

参数
  • inputs (列表[输入]) – 要传递给可运行程序的输入列表。

  • config (可选[联合[可运行程序配置, 列表[可运行程序配置]]) – 当调用可运行程序时使用的配置。配置支持标准关键字,如'tags'、'metadata',用于跟踪目的,'max_concurrency'用于控制并行进行的作业量,以及其他关键字。请参阅可运行程序配置以获取更多详细信息。默认为None。

  • return_exceptions (布尔值) – 是否返回异常而不是引发异常。默认为False。

  • kwargs (可选[任何]) – 要传递给可运行程序的其他关键字参数。

返回

可运行程序返回的输出列表。

返回类型

列表[输出]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

并行在一个输入列表上运行 runinvoke,随着每个任务完成时产出结果。

参数
  • inputs (Sequence[Input]) – Runnable 输入的列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable时使用的配置。该配置支持用于跟踪的标准键,如 'tags'、'metadata',用于控制并行工作的 'max_concurrency' 等其他键。请参阅 RunnableConfig 以获取更多详细信息。默认值是 None。

  • return_exceptions (布尔值) – 是否返回异常而不是引发异常。默认为False。

  • kwargs (可选[任何]) – 要传递给可运行程序的其他关键字参数。

产出

输入索引和 Runnable产出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Union[str, BaseMessage], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) T

ainvoke 的默认实现,从线程中调用 invoke。

默认实现允许即使 Runnable 没有实现 invoke 的原生异步版本,也使用异步代码。

如果子类可以异步运行,则应重写此方法。

参数
返回类型

T

async aparse(text: str) T

异步解析一个字符串模型输出到某种结构。

参数

text (str) – 语言模型的字符串输出。

返回

结构化输出。

返回类型

T

async aparse_result(result: List[Generation], *, partial: bool = False) T

异步解析一系列候选模型生成的列表到特定的格式。

返回值仅从结果中的第一个生成器进行解析,

假设它是概率最高的生成器。

参数
  • result (列表[Generation]) - 要解析的生成器列表。假设这些生成器是单一模型输入的不同候选输出。

  • partial () - 是否将输出解析为部分结果。对于可以解析部分结果的解析器很有用。默认为False。

返回

结构化输出。

返回类型

T

as_tool(args_schema: Optional[类型[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[字典[str, 类型]] = None) BaseTool

测试版

此API处于测试版,可能在未来更改。

从可运行对象创建BaseTool。

as_tool将从可运行对象中创建一个具有名称、描述和args_schema的BaseTool。在可能的情况下,模式从runnable.get_input_schema推断出来。另请选择(例如,如果可运行对象接受字典作为输入且特定的字典键未被类型化),可以使用args_schema直接指定模式。您还可以通过传递arg_types只指定必需的参数和它们的类型。

参数
  • args_schema (可选[类型[BaseModel]]) – 工具的模式。默认值是None。

  • name (可选[str]) – 工具的名称。默认值是None。

  • description (可选[str]) – 工具的描述。默认值是None。

  • arg_types (可选[字典[str, 类型]]) – 参数名称到类型的字典。默认值是None。

返回

一个BaseTool实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式。

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式。

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

自版本 0.2.14 开始支持。

asyncastream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,它调用 ainvoke。子类如果支持流式输出,则应重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 要用于 Runnable 的配置。默认值为 None。

  • kwargs (可选[任何]) – 要传递给可运行程序的其他关键字参数。

产出

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

测试版

此API处于测试版,可能在未来更改。

生成事件流。

用于创建对 StreamEvents 的迭代器,这些事件提供有关 Runnable 进度的实时信息,包括来自中间结果的事件流。

StreamEvent 是一个具有以下模式的字典

  • event: str - 事件名称的格式如下:

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与 Runnable 的给定执行相关联的随机生成的 ID。

    作为父 Runnable 执行一部分调用的子 Runnable 将赋予其唯一的 ID。

  • parent_ids: List[str] - 生成事件的父 Runnable 的 ID。

    根 Runnable 将具有空列表。父 ID 的顺序是从根到直接父级。仅适用于 v2 API 版本。v1 API 版本将返回空列表。

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

  • metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。

  • data: Dict[str, Any]

以下表格显示了一些可能由不同链条发出的事件。为了简洁,表中省略了元数据字段。在表之后包括链定义。

注意 此参考表适用于方案的V2版本。

事件

名称

输入

输出

on_chat_model_start

【模型名称】

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

【模型名称】

AIMessageChunk(content="hello")

on_chat_model_end

【模型名称】

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content="hello world")

on_llm_start

【模型名称】

{‘input’: ‘hello’}

on_llm_stream

【模型名称】

‘Hello’

on_llm_end

【模型名称】

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[retriever name]

{“query”: “hello”}

on_retriever_end

[retriever name]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[template_name]

{“question”: “hello”}

on_prompt_end

[template_name]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件之外,用户还可以派发自定义事件(如下例所示)。

自定义事件仅在API的V2版本中显示!

自定义事件的格式如下

属性

类型

描述

名称

str

为事件定义的用户名称。

数据

任何

与事件关联的数据。这可以是任何东西,但我们建议使其具有JSON可序列化的特性。

以下是与上述标准事件相关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:派发自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (任何) – Runnable的输入。

  • config (可选[RunnableConfig]) – 为Runnable使用的配置。

  • version (文字['v1', 'v2']) – 要使用的方案版本,可以是V2V1。用户应使用V2V1用于向后兼容,将在0.4.0中弃用。直到API稳定,将不会分配默认值。自定义事件仅在V2中显示。

  • include_names (可选[序列[str]]) – 仅包括匹配名称的Runnable的事件。

  • include_types (可选[序列[str]]) – 仅包括匹配类型的Runnable的事件。

  • include_tags (可选[序列[str]]) – 仅包括匹配标签的Runnable的事件。

  • exclude_names (可选[序列[str]]) – 排除匹配名称的Runnable的事件。

  • exclude_types (可选[序列[str]]) – 排除匹配类型的Runnable的事件。

  • exclude_tags (可选[序列[str]]) – 排除匹配标签的Runnable的事件。

  • kwargs (任何) – 指定要传递给Runnable的额外关键字参数。这些将通过astream_log传递,因为此astream_events实现是在astream_log的基础上构建的。

产出

异步流中的StreamEvents。

引发

NotImplementedError – 如果版本不是V1V2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现通过线程池执行器并行运行。

默认批处理实现适用于IO密集型可运行程序。

子类应在其能更有效地批处理的情况下重写此方法;例如,如果底层可运行程序使用的API支持批处理模式。

参数
  • inputs (列表[输入]) –

  • config (可选[并集[运行时配置, 列表[运行时配置]]) –

  • return_exceptions (布尔值) –

  • kwargs (Optional[Any]) –

返回类型

列表[输出]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]], *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

在输入列表上并行运行调用,并按完成顺序输出结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

在运行时可以配置的 Runnables 替代选项。

参数
  • which (ConfigurableField) – 将用于选择替代选项的 ConfigurableField 实例。

  • default_key (str) – 在未选择替代选项时使用的默认键。默认为“default”。

  • prefix_keys (bool) – 是否在键前添加 ConfigurableField id 作为前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

返回

配置替代选项后的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的可运行字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 用于配置的 ConfigurableField 实例的字典。

返回

配置字段的新的可运行实例。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
get_format_instructions() str

LLM 输出应该如何格式的说明。

返回类型

str

invoke(input: Union[str, BaseMessage], config: Optional[RunnableConfig] = None) T

将单个输入转换为输出。重写以实现。

参数
  • input (Union[str, BaseMessage]) – 可运行实例的输入。

  • config (可选[RunnableConfig]) – 当调用Runnable时使用的配置。该配置支持标准键,如用于追踪的“tags”和“metadata”,以及用于控制并行工作的“max_concurrency”等键。请参阅RunnableConfig获取更多详细信息。

返回

Runnable 的输出。

返回类型

T

abstract parse(text: str) T

将单个字符串模型输出解析为某种结构。

参数

text (str) – 语言模型的字符串输出。

返回

结构化输出。

返回类型

T

parse_result(result: List[Generation], *, partial: bool = False) T

将候选模型生成列表解析为特定格式。

返回值仅从结果中的第一个生成器进行解析,

假设它是概率最高的生成器。

参数
  • result (列表[Generation]) - 要解析的生成器列表。假设这些生成器是单一模型输入的不同候选输出。

  • partial () - 是否将输出解析为部分结果。对于可以解析部分结果的解析器很有用。默认为False。

返回

结构化输出。

返回类型

T

parse_with_prompt(completion: str, prompt: PromptValue) Any

使用输入提示作为上下文解析LLM调用的输出。

事件中通常提供提示,以供OutputParser重试或以某种方式修复输出,其需要从提示中获取信息以这样做。

参数
  • completion (str) – 语言模型字符串输出。

  • prompt (PromptValue) – 输入PromptValue。

返回

结构化输出。

返回类型

任何

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream的默认实现,调用invoke。子类应重写此方法以支持流式输出。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 要用于 Runnable 的配置。默认值为 None。

  • kwargs (可选[任何]) – 要传递给可运行程序的其他关键字参数。

产出

Runnable 的输出。

返回类型

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将Runnable序列化为JSON。

返回

Runnable的JSON序列化表示。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]