langchain.chains.combine_documents.map_reduce.MapReduceDocumentsChain

注意

MapReduceDocumentsChain 实现了标准的 Runnable 接口。🏃

Runnable 接口 具有在可运行对象上可用的其他方法,例如 with_types, with_retry, assign, bind, get_graph, 以及更多。

class langchain.chains.combine_documents.map_reduce.MapReduceDocumentsChain[源代码]

基类: BaseCombineDocumentsChain

通过对文档进行映射链操作,然后组合结果来组合文档。

我们首先在每个文档上单独调用 llm_chain,传入 page_content 和任何其他 kwargs。这是 map 步骤。

然后,我们在 reduce 步骤中处理 map 步骤的结果。这很可能是一个 ReduceDocumentsChain。

示例

from langchain.chains import (
    StuffDocumentsChain,
    LLMChain,
    ReduceDocumentsChain,
    MapReduceDocumentsChain,
)
from langchain_core.prompts import PromptTemplate
from langchain_community.llms import OpenAI

# This controls how each document will be formatted. Specifically,
# it will be passed to `format_document` - see that function for more
# details.
document_prompt = PromptTemplate(
    input_variables=["page_content"],
     template="{page_content}"
)
document_variable_name = "context"
llm = OpenAI()
# The prompt here should take as an input variable the
# `document_variable_name`
prompt = PromptTemplate.from_template(
    "Summarize this content: {context}"
)
llm_chain = LLMChain(llm=llm, prompt=prompt)
# We now define how to combine these summaries
reduce_prompt = PromptTemplate.from_template(
    "Combine these summaries: {context}"
)
reduce_llm_chain = LLMChain(llm=llm, prompt=reduce_prompt)
combine_documents_chain = StuffDocumentsChain(
    llm_chain=reduce_llm_chain,
    document_prompt=document_prompt,
    document_variable_name=document_variable_name
)
reduce_documents_chain = ReduceDocumentsChain(
    combine_documents_chain=combine_documents_chain,
)
chain = MapReduceDocumentsChain(
    llm_chain=llm_chain,
    reduce_documents_chain=reduce_documents_chain,
)
# If we wanted to, we could also pass in collapse_documents_chain
# which is specifically aimed at collapsing documents BEFORE
# the final call.
prompt = PromptTemplate.from_template(
    "Collapse this content: {context}"
)
llm_chain = LLMChain(llm=llm, prompt=prompt)
collapse_documents_chain = StuffDocumentsChain(
    llm_chain=llm_chain,
    document_prompt=document_prompt,
    document_variable_name=document_variable_name
)
reduce_documents_chain = ReduceDocumentsChain(
    combine_documents_chain=combine_documents_chain,
    collapse_documents_chain=collapse_documents_chain,
)
chain = MapReduceDocumentsChain(
    llm_chain=llm_chain,
    reduce_documents_chain=reduce_documents_chain,
)
param callback_manager: Optional[BaseCallbackManager] = None

[已弃用] 请使用 callbacks 代替。

param callbacks: Callbacks = None

回调处理程序(或回调管理器)的可选列表。默认为 None。回调处理程序在链调用的整个生命周期中被调用,从 on_chain_start 开始,到 on_chain_end 或 on_chain_error 结束。每个自定义链可以选择性地调用其他回调方法,有关完整详细信息,请参阅回调文档。

param document_variable_name: str [必需]

llm_chain 中用于放置文档的变量名。如果 llm_chain 中只有一个变量,则无需提供此项。

param llm_chain: LLMChain [必需]

要单独应用于每个文档的链。

param memory: Optional[BaseMemory] = None

可选的 memory 对象。默认为 None。Memory 是一个在每个链的开始和结束时调用的类。在开始时,memory 加载变量并在链中传递它们。在结束时,它保存任何返回的变量。有许多不同类型的 memory - 有关完整目录,请参阅 memory 文档。

param metadata: Optional[Dict[str, Any]] = None

与链关联的可选元数据。默认为 None。此元数据将与对此链的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用它们来识别链的特定实例及其用例,例如。

param reduce_documents_chain: BaseCombineDocumentsChain [必需]

用于减少将 llm_chain 应用于每个文档的结果的链。这通常是 ReduceDocumentChain 或 StuffDocumentChain。

param return_intermediate_steps: bool = False

在输出中返回 map 步骤的结果。

param tags: Optional[List[str]] = None

与链关联的可选标签列表。默认为 None。这些标签将与对此链的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些来识别链的特定实例及其用例,例如。

param verbose: bool [可选]

是否在 verbose 模式下运行。在 verbose 模式下,一些中间日志将被打印到控制台。默认为全局 verbose 值,可通过 langchain.globals.get_verbose() 访问。

__call__(inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, *, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, include_run_info: bool = False) Dict[str, Any]

Deprecated since version langchain==0.1.0: Use invoke instead.

执行链。

参数
  • inputs (Union[Dict[str, Any], Any]) – 输入字典,或者如果链仅期望一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的 memory 将设置的输入除外。

  • return_only_outputs (bool) – 是否仅在响应中返回输出。如果为 True,则仅返回此链生成的新键。如果为 False,则将返回输入键和此链生成的新键。默认为 False。

  • callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。

  • tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。

  • metadata (Optional[Dict[str, Any]]) – 与链关联的可选元数据。默认为 None

  • include_run_info (bool) – 是否在响应中包含运行信息。默认为 False。

  • run_name (Optional[str]) –

返回值

一个命名的输出字典。应包含中指定的所有输出

Chain.output_keys.

返回类型

Dict[str, Any]

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现适用于 IO 绑定的可运行对象。

子类应该覆盖此方法,如果它们可以更有效地进行批处理;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行多少工作的 ‘max_concurrency’,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

返回值

Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在一个输入列表上并行运行 ainvoke,并在结果完成时生成结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行多少工作的 ‘max_concurrency’,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Yields

一个元组,包含输入的索引和来自 Runnable 的输出。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async acall(inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, *, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, include_run_info: bool = False) Dict[str, Any]

自 langchain==0.1.0 版本起已弃用: 请使用 ainvoke 代替。

异步执行链。

参数
  • inputs (Union[Dict[str, Any], Any]) – 输入字典,或者如果链仅期望一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的 memory 将设置的输入除外。

  • return_only_outputs (bool) – 是否仅在响应中返回输出。如果为 True,则仅返回此链生成的新键。如果为 False,则将返回输入键和此链生成的新键。默认为 False。

  • callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。

  • tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。

  • metadata (Optional[Dict[str, Any]]) – 与链关联的可选元数据。默认为 None

  • include_run_info (bool) – 是否在响应中包含运行信息。默认为 False。

  • run_name (Optional[str]) –

返回值

一个命名的输出字典。应包含中指定的所有输出

Chain.output_keys.

返回类型

Dict[str, Any]

async acombine_docs(docs: List[Document], token_max: Optional[int] = None, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, **kwargs: Any) Tuple[str, dict][source]

以 Map Reduce 方式合并文档。

合并方式是首先将链映射到所有文档,然后归约结果。如果需要(如果文档很多),可以递归地进行归约。

参数
返回类型

Tuple[str, dict]

async ainvoke(input: Dict[str, Any], config: Optional[RunnableConfig] = None, **kwargs: Any) Dict[str, Any]

`ainvoke` 的默认实现,从线程中调用 `invoke`。

即使 Runnable 没有实现 `invoke` 的原生异步版本,默认实现也允许使用异步代码。

如果子类可以异步运行,则应重写此方法。

参数
  • input (Dict[str, Any]) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Any) –

返回类型

Dict[str, Any]

apply(input_list: List[Dict[str, Any]], callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None) List[Dict[str, str]]

自 langchain==0.1.0 版本起已弃用: 请使用 batch 代替。

对列表中的所有输入调用链。

参数
返回类型

List[Dict[str, str]]

async aprep_inputs(inputs: Union[Dict[str, Any], Any]) Dict[str, str]

准备链的输入,包括从内存中添加输入。

参数

inputs (Union[Dict[str, Any], Any]) – 原始输入的字典,或者如果链只接受一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的内存将设置的输入除外。

返回值

包含所有输入的字典,包括链的内存添加的输入。

返回类型

Dict[str, str]

async aprep_outputs(inputs: Dict[str, str], outputs: Dict[str, str], return_only_outputs: bool = False) Dict[str, str]

验证并准备链的输出,并将有关此运行的信息保存到内存中。

参数
  • inputs (Dict[str, str]) – 链输入的字典,包括链内存添加的任何输入。

  • outputs (Dict[str, str]) – 初始链输出的字典。

  • return_only_outputs (bool) – 是否仅返回链输出。如果为 False,输入也会添加到最终输出中。

返回值

最终链输出的字典。

返回类型

Dict[str, str]

async arun(*args: Any, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any) Any

自 langchain==0.1.0 版本起已弃用: 请使用 ainvoke 代替。

执行链的便捷方法。

此方法与 Chain.__call__ 之间的主要区别在于,此方法期望输入直接作为位置参数或关键字参数传入,而 Chain.__call__ 期望一个包含所有输入的单个输入字典

参数
  • *args (Any) – 如果链只接受一个输入,则可以作为唯一的位置参数传入。

  • callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。

  • tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。

  • **kwargs (Any) – 如果链接受多个输入,则可以直接作为关键字参数传入。

  • metadata (Optional[Dict[str, Any]]) –

  • **kwargs

返回值

链输出。

返回类型

Any

示例

# Suppose we have a single-input chain that takes a 'question' string:
await chain.arun("What's the temperature in Boise, Idaho?")
# -> "The temperature in Boise is..."

# Suppose we have a multi-input chain that takes a 'question' string
# and 'context' string:
question = "What's the temperature in Boise, Idaho?"
context = "Weather report for Boise, Idaho on 07/03/23..."
await chain.arun(question=question, context=context)
# -> "The temperature in Boise is..."
as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,未来可能会发生变化。

从 Runnable 创建一个 BaseTool。

as_tool 将从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。如果可能,模式将从 runnable.get_input_schema 推断。或者(例如,如果 Runnable 接受字典作为输入且未对特定字典键进行类型化),可以使用 args_schema 直接指定模式。您还可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。

  • name (Optional[str]) – 工具的名称。默认为 None。

  • description (Optional[str]) – 工具的描述。默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。

返回值

BaseTool 实例。

返回类型

BaseTool

Typed dict input

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict input, specifying schema via args_schema

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict input, specifying schema via arg_types

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

String input

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本新增。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

`astream` 的默认实现,它调用 `ainvoke`。如果子类支持流式输出,则应重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,未来可能会发生变化。

生成事件流。

用于创建 StreamEvent 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。

StreamEvent 是一个具有以下模式的字典

  • event: str - 事件名称的格式为:

    on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与发出事件的 Runnable 的给定执行关联的随机生成的 ID。作为父 Runnable 执行的一部分调用的子 Runnable 将被分配其自己的唯一 ID。

    的 Runnable,该 Runnable 发出了事件。作为父 Runnable 执行的一部分调用的子 Runnable 将被分配其自己的唯一 ID。

  • parent_ids: List[str] - 生成事件的父 runnables 的 ID。根 Runnable 将具有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

    generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    the event.

  • metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据

    that generated the event.

  • data: Dict[str, Any]

下表说明了各种链可能发出的一些事件。为了简洁起见,元数据字段已从表中省略。链定义已包含在表后。

注意 此参考表适用于 V2 版本的模式。

event

name

chunk

input

output

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件之外,用户还可以分派自定义事件(请参见下面的示例)。

自定义事件将仅在 v2 版本的 API 中显示!

自定义事件具有以下格式

属性

类型

描述

name

str

用户定义的事件名称。

data

Any

与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。

以下是与上面显示的标准事件关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:分派自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包含来自具有匹配名称的 runnables 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包含来自具有匹配类型的 runnables 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包含来自具有匹配标签的 runnables 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些将传递给 astream_log,因为此 astream_events 的实现是基于 astream_log 构建的。

Yields

StreamEvent 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

batch 的默认实现适用于 IO 绑定的可运行对象。

子类应该覆盖此方法,如果它们可以更有效地进行批处理;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行列表中输入的 invoke,并在完成时产生结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

combine_docs(docs: List[Document], token_max: Optional[int] = None, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, **kwargs: Any) Tuple[str, dict][source]

以 Map Reduce 方式合并文档。

合并方式是首先将链映射到所有文档,然后归约结果。如果需要(如果文档很多),可以递归地进行归约。

参数
返回类型

Tuple[str, dict]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

配置可在运行时设置的 Runnable 的备选项。

参数
  • which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。

  • default_key (str) – 如果未选择备选项,则使用的默认键。默认为“default”。

  • prefix_keys (bool) – 是否用 ConfigurableField id 作为键的前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

返回值

配置了备选项的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的 Runnable 字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

返回值

配置了字段的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
invoke(input: Dict[str, Any], config: Optional[RunnableConfig] = None, **kwargs: Any) Dict[str, Any]

将单个输入转换为输出。覆盖以实现。

参数
  • input (Dict[str, Any]) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 调用 Runnable 时要使用的配置。该配置支持用于跟踪目的的标准键,如“tags”、“metadata”,用于控制并行执行工作量的“max_concurrency”以及其他键。请参阅 RunnableConfig 以获取更多详细信息。

  • kwargs (Any) –

返回值

Runnable 的输出。

返回类型

Dict[str, Any]

prep_inputs(inputs: Union[Dict[str, Any], Any]) Dict[str, str]

准备链的输入,包括从内存中添加输入。

参数

inputs (Union[Dict[str, Any], Any]) – 原始输入的字典,或者如果链只接受一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的内存将设置的输入除外。

返回值

包含所有输入的字典,包括链的内存添加的输入。

返回类型

Dict[str, str]

prep_outputs(inputs: Dict[str, str], outputs: Dict[str, str], return_only_outputs: bool = False) Dict[str, str]

验证并准备链的输出,并将有关此运行的信息保存到内存中。

参数
  • inputs (Dict[str, str]) – 链输入的字典,包括链内存添加的任何输入。

  • outputs (Dict[str, str]) – 初始链输出的字典。

  • return_only_outputs (bool) – 是否仅返回链输出。如果为 False,输入也会添加到最终输出中。

返回值

最终链输出的字典。

返回类型

Dict[str, str]

prompt_length(docs: List[Document], **kwargs: Any) Optional[int]

返回给定传入文档的提示长度。

调用者可以使用此方法来确定传入文档列表是否会超出某个提示长度。这在尝试确保提示的大小保持在某个上下文限制以下时非常有用。

参数
  • docs (List[Document]) – List[Document],用于计算总提示长度的文档列表。

  • kwargs (Any) –

返回值

如果该方法不依赖于提示长度,则返回 None,否则返回提示的 token 长度。

返回类型

Optional[int]

run(*args: Any, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any) Any

Deprecated since version langchain==0.1.0: Use invoke instead.

执行链的便捷方法。

此方法与 Chain.__call__ 之间的主要区别在于,此方法期望输入直接作为位置参数或关键字参数传入,而 Chain.__call__ 期望一个包含所有输入的单个输入字典

参数
  • *args (Any) – 如果链只接受一个输入,则可以作为唯一的位置参数传入。

  • callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。

  • tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。

  • **kwargs (Any) – 如果链接受多个输入,则可以直接作为关键字参数传入。

  • metadata (Optional[Dict[str, Any]]) –

  • **kwargs

返回值

链输出。

返回类型

Any

示例

# Suppose we have a single-input chain that takes a 'question' string:
chain.run("What's the temperature in Boise, Idaho?")
# -> "The temperature in Boise is..."

# Suppose we have a multi-input chain that takes a 'question' string
# and 'context' string:
question = "What's the temperature in Boise, Idaho?"
context = "Weather report for Boise, Idaho on 07/03/23..."
chain.run(question=question, context=context)
# -> "The temperature in Boise is..."
save(file_path: Union[Path, str]) None

保存链。

期望实现 Chain._chain_type 属性,并且内存为

空。

参数

file_path (Union[Path, str]) – 将链保存到的文件路径。

返回类型

None

示例

chain.save(file_path="path/chain.yaml")
stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

返回类型

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回值

Runnable 的 JSON 可序列化表示形式。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]

property collapse_document_chain: BaseCombineDocumentsChain

为向后兼容而保留。

property combine_document_chain: BaseCombineDocumentsChain

为向后兼容而保留。

使用 MapReduceDocumentsChain 的示例