langchain.chains.combine_documents.map_reduce
.MapReduceDocumentsChain¶
注意
MapReduceDocumentsChain 实现了标准的 Runnable 接口
。🏃
Runnable 接口
具有在可运行对象上可用的其他方法,例如 with_types
, with_retry
, assign
, bind
, get_graph
, 以及更多。
- class langchain.chains.combine_documents.map_reduce.MapReduceDocumentsChain[源代码]¶
-
通过对文档进行映射链操作,然后组合结果来组合文档。
我们首先在每个文档上单独调用 llm_chain,传入 page_content 和任何其他 kwargs。这是 map 步骤。
然后,我们在 reduce 步骤中处理 map 步骤的结果。这很可能是一个 ReduceDocumentsChain。
示例
from langchain.chains import ( StuffDocumentsChain, LLMChain, ReduceDocumentsChain, MapReduceDocumentsChain, ) from langchain_core.prompts import PromptTemplate from langchain_community.llms import OpenAI # This controls how each document will be formatted. Specifically, # it will be passed to `format_document` - see that function for more # details. document_prompt = PromptTemplate( input_variables=["page_content"], template="{page_content}" ) document_variable_name = "context" llm = OpenAI() # The prompt here should take as an input variable the # `document_variable_name` prompt = PromptTemplate.from_template( "Summarize this content: {context}" ) llm_chain = LLMChain(llm=llm, prompt=prompt) # We now define how to combine these summaries reduce_prompt = PromptTemplate.from_template( "Combine these summaries: {context}" ) reduce_llm_chain = LLMChain(llm=llm, prompt=reduce_prompt) combine_documents_chain = StuffDocumentsChain( llm_chain=reduce_llm_chain, document_prompt=document_prompt, document_variable_name=document_variable_name ) reduce_documents_chain = ReduceDocumentsChain( combine_documents_chain=combine_documents_chain, ) chain = MapReduceDocumentsChain( llm_chain=llm_chain, reduce_documents_chain=reduce_documents_chain, ) # If we wanted to, we could also pass in collapse_documents_chain # which is specifically aimed at collapsing documents BEFORE # the final call. prompt = PromptTemplate.from_template( "Collapse this content: {context}" ) llm_chain = LLMChain(llm=llm, prompt=prompt) collapse_documents_chain = StuffDocumentsChain( llm_chain=llm_chain, document_prompt=document_prompt, document_variable_name=document_variable_name ) reduce_documents_chain = ReduceDocumentsChain( combine_documents_chain=combine_documents_chain, collapse_documents_chain=collapse_documents_chain, ) chain = MapReduceDocumentsChain( llm_chain=llm_chain, reduce_documents_chain=reduce_documents_chain, )
- param callback_manager: Optional[BaseCallbackManager] = None¶
[已弃用] 请使用 callbacks 代替。
- param callbacks: Callbacks = None¶
回调处理程序(或回调管理器)的可选列表。默认为 None。回调处理程序在链调用的整个生命周期中被调用,从 on_chain_start 开始,到 on_chain_end 或 on_chain_error 结束。每个自定义链可以选择性地调用其他回调方法,有关完整详细信息,请参阅回调文档。
- param document_variable_name: str [必需]¶
llm_chain 中用于放置文档的变量名。如果 llm_chain 中只有一个变量,则无需提供此项。
- param memory: Optional[BaseMemory] = None¶
可选的 memory 对象。默认为 None。Memory 是一个在每个链的开始和结束时调用的类。在开始时,memory 加载变量并在链中传递它们。在结束时,它保存任何返回的变量。有许多不同类型的 memory - 有关完整目录,请参阅 memory 文档。
- param metadata: Optional[Dict[str, Any]] = None¶
与链关联的可选元数据。默认为 None。此元数据将与对此链的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用它们来识别链的特定实例及其用例,例如。
- param reduce_documents_chain: BaseCombineDocumentsChain [必需]¶
用于减少将 llm_chain 应用于每个文档的结果的链。这通常是 ReduceDocumentChain 或 StuffDocumentChain。
- param return_intermediate_steps: bool = False¶
在输出中返回 map 步骤的结果。
- param tags: Optional[List[str]] = None¶
与链关联的可选标签列表。默认为 None。这些标签将与对此链的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些来识别链的特定实例及其用例,例如。
- param verbose: bool [可选]¶
是否在 verbose 模式下运行。在 verbose 模式下,一些中间日志将被打印到控制台。默认为全局 verbose 值,可通过 langchain.globals.get_verbose() 访问。
- __call__(inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, *, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, include_run_info: bool = False) Dict[str, Any] ¶
Deprecated since version langchain==0.1.0: Use
invoke
instead.执行链。
- 参数
inputs (Union[Dict[str, Any], Any]) – 输入字典,或者如果链仅期望一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的 memory 将设置的输入除外。
return_only_outputs (bool) – 是否仅在响应中返回输出。如果为 True,则仅返回此链生成的新键。如果为 False,则将返回输入键和此链生成的新键。默认为 False。
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。
tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。
metadata (Optional[Dict[str, Any]]) – 与链关联的可选元数据。默认为 None
include_run_info (bool) – 是否在响应中包含运行信息。默认为 False。
run_name (Optional[str]) –
- 返回值
- 一个命名的输出字典。应包含中指定的所有输出
Chain.output_keys.
- 返回类型
Dict[str, Any]
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现适用于 IO 绑定的可运行对象。
子类应该覆盖此方法,如果它们可以更有效地进行批处理;例如,如果底层 Runnable 使用支持批处理模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行多少工作的 ‘max_concurrency’,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 返回值
Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
在一个输入列表上并行运行 ainvoke,并在结果完成时生成结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,例如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行多少工作的 ‘max_concurrency’,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- Yields
一个元组,包含输入的索引和来自 Runnable 的输出。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async acall(inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, *, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, include_run_info: bool = False) Dict[str, Any] ¶
自 langchain==0.1.0 版本起已弃用: 请使用
ainvoke
代替。异步执行链。
- 参数
inputs (Union[Dict[str, Any], Any]) – 输入字典,或者如果链仅期望一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的 memory 将设置的输入除外。
return_only_outputs (bool) – 是否仅在响应中返回输出。如果为 True,则仅返回此链生成的新键。如果为 False,则将返回输入键和此链生成的新键。默认为 False。
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。
tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。
metadata (Optional[Dict[str, Any]]) – 与链关联的可选元数据。默认为 None
include_run_info (bool) – 是否在响应中包含运行信息。默认为 False。
run_name (Optional[str]) –
- 返回值
- 一个命名的输出字典。应包含中指定的所有输出
Chain.output_keys.
- 返回类型
Dict[str, Any]
- async acombine_docs(docs: List[Document], token_max: Optional[int] = None, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, **kwargs: Any) Tuple[str, dict] [source]¶
以 Map Reduce 方式合并文档。
合并方式是首先将链映射到所有文档,然后归约结果。如果需要(如果文档很多),可以递归地进行归约。
- 参数
docs (List[Document]) –
token_max (Optional[int]) –
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) –
kwargs (Any) –
- 返回类型
Tuple[str, dict]
- async ainvoke(input: Dict[str, Any], config: Optional[RunnableConfig] = None, **kwargs: Any) Dict[str, Any] ¶
`ainvoke` 的默认实现,从线程中调用 `invoke`。
即使 Runnable 没有实现 `invoke` 的原生异步版本,默认实现也允许使用异步代码。
如果子类可以异步运行,则应重写此方法。
- 参数
input (Dict[str, Any]) –
config (Optional[RunnableConfig]) –
kwargs (Any) –
- 返回类型
Dict[str, Any]
- apply(input_list: List[Dict[str, Any]], callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None) List[Dict[str, str]] ¶
自 langchain==0.1.0 版本起已弃用: 请使用
batch
代替。对列表中的所有输入调用链。
- 参数
input_list (List[Dict[str, Any]]) –
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) –
- 返回类型
List[Dict[str, str]]
- async aprep_inputs(inputs: Union[Dict[str, Any], Any]) Dict[str, str] ¶
准备链的输入,包括从内存中添加输入。
- 参数
inputs (Union[Dict[str, Any], Any]) – 原始输入的字典,或者如果链只接受一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的内存将设置的输入除外。
- 返回值
包含所有输入的字典,包括链的内存添加的输入。
- 返回类型
Dict[str, str]
- async aprep_outputs(inputs: Dict[str, str], outputs: Dict[str, str], return_only_outputs: bool = False) Dict[str, str] ¶
验证并准备链的输出,并将有关此运行的信息保存到内存中。
- 参数
inputs (Dict[str, str]) – 链输入的字典,包括链内存添加的任何输入。
outputs (Dict[str, str]) – 初始链输出的字典。
return_only_outputs (bool) – 是否仅返回链输出。如果为 False,输入也会添加到最终输出中。
- 返回值
最终链输出的字典。
- 返回类型
Dict[str, str]
- async arun(*args: Any, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any) Any ¶
自 langchain==0.1.0 版本起已弃用: 请使用
ainvoke
代替。执行链的便捷方法。
此方法与 Chain.__call__ 之间的主要区别在于,此方法期望输入直接作为位置参数或关键字参数传入,而 Chain.__call__ 期望一个包含所有输入的单个输入字典
- 参数
*args (Any) – 如果链只接受一个输入,则可以作为唯一的位置参数传入。
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。
tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。
**kwargs (Any) – 如果链接受多个输入,则可以直接作为关键字参数传入。
metadata (Optional[Dict[str, Any]]) –
**kwargs –
- 返回值
链输出。
- 返回类型
Any
示例
# Suppose we have a single-input chain that takes a 'question' string: await chain.arun("What's the temperature in Boise, Idaho?") # -> "The temperature in Boise is..." # Suppose we have a multi-input chain that takes a 'question' string # and 'context' string: question = "What's the temperature in Boise, Idaho?" context = "Weather report for Boise, Idaho on 07/03/23..." await chain.arun(question=question, context=context) # -> "The temperature in Boise is..."
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 Beta 阶段,未来可能会发生变化。
从 Runnable 创建一个 BaseTool。
as_tool
将从 Runnable 实例化一个具有名称、描述和args_schema
的 BaseTool。如果可能,模式将从runnable.get_input_schema
推断。或者(例如,如果 Runnable 接受字典作为输入且未对特定字典键进行类型化),可以使用args_schema
直接指定模式。您还可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回值
BaseTool 实例。
- 返回类型
Typed dict input
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema viaargs_schema
from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input, specifying schema viaarg_types
from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
String input
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本新增。
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
`astream` 的默认实现,它调用 `ainvoke`。如果子类支持流式输出,则应重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- Yields
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 Beta 阶段,未来可能会发生变化。
生成事件流。
用于创建 StreamEvent 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
StreamEvent 是一个具有以下模式的字典
event
: str - 事件名称的格式为:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与发出事件的 Runnable 的给定执行关联的随机生成的 ID。作为父 Runnable 执行的一部分调用的子 Runnable 将被分配其自己的唯一 ID。的 Runnable,该 Runnable 发出了事件。作为父 Runnable 执行的一部分调用的子 Runnable 将被分配其自己的唯一 ID。
parent_ids
: List[str] - 生成事件的父 runnables 的 ID。根 Runnable 将具有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。the event.
metadata
: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据that generated the event.
data
: Dict[str, Any]
下表说明了各种链可能发出的一些事件。为了简洁起见,元数据字段已从表中省略。链定义已包含在表后。
注意 此参考表适用于 V2 版本的模式。
event
name
chunk
input
output
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content=”hello”)
on_chat_model_end
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[检索器名称]
{“query”: “hello”}
on_retriever_end
[检索器名称]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{“question”: “hello”}
on_prompt_end
[模板名称]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件之外,用户还可以分派自定义事件(请参见下面的示例)。
自定义事件将仅在 v2 版本的 API 中显示!
自定义事件具有以下格式
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。
以下是与上面显示的标准事件关联的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分派自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2 或 v1。用户应使用 v2。v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包含来自具有匹配名称的 runnables 的事件。
include_types (Optional[Sequence[str]]) – 仅包含来自具有匹配类型的 runnables 的事件。
include_tags (Optional[Sequence[str]]) – 仅包含来自具有匹配标签的 runnables 的事件。
exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。
exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。
exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些将传递给 astream_log,因为此 astream_events 的实现是基于 astream_log 构建的。
- Yields
StreamEvent 的异步流。
- Raises
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
batch 的默认实现适用于 IO 绑定的可运行对象。
子类应该覆盖此方法,如果它们可以更有效地进行批处理;例如,如果底层 Runnable 使用支持批处理模式的 API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行列表中输入的 invoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- combine_docs(docs: List[Document], token_max: Optional[int] = None, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, **kwargs: Any) Tuple[str, dict] [source]¶
以 Map Reduce 方式合并文档。
合并方式是首先将链映射到所有文档,然后归约结果。如果需要(如果文档很多),可以递归地进行归约。
- 参数
docs (List[Document]) –
token_max (Optional[int]) –
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) –
kwargs (Any) –
- 返回类型
Tuple[str, dict]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
配置可在运行时设置的 Runnable 的备选项。
- 参数
which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。
default_key (str) – 如果未选择备选项,则使用的默认键。默认为“default”。
prefix_keys (bool) – 是否用 ConfigurableField id 作为键的前缀。默认为 False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。
- 返回值
配置了备选项的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output] ¶
在运行时配置特定的 Runnable 字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。
- 返回值
配置了字段的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- invoke(input: Dict[str, Any], config: Optional[RunnableConfig] = None, **kwargs: Any) Dict[str, Any] ¶
将单个输入转换为输出。覆盖以实现。
- 参数
input (Dict[str, Any]) – Runnable 的输入。
config (Optional[RunnableConfig]) – 调用 Runnable 时要使用的配置。该配置支持用于跟踪目的的标准键,如“tags”、“metadata”,用于控制并行执行工作量的“max_concurrency”以及其他键。请参阅 RunnableConfig 以获取更多详细信息。
kwargs (Any) –
- 返回值
Runnable 的输出。
- 返回类型
Dict[str, Any]
- prep_inputs(inputs: Union[Dict[str, Any], Any]) Dict[str, str] ¶
准备链的输入,包括从内存中添加输入。
- 参数
inputs (Union[Dict[str, Any], Any]) – 原始输入的字典,或者如果链只接受一个参数,则为单个输入。应包含 Chain.input_keys 中指定的所有输入,但链的内存将设置的输入除外。
- 返回值
包含所有输入的字典,包括链的内存添加的输入。
- 返回类型
Dict[str, str]
- prep_outputs(inputs: Dict[str, str], outputs: Dict[str, str], return_only_outputs: bool = False) Dict[str, str] ¶
验证并准备链的输出,并将有关此运行的信息保存到内存中。
- 参数
inputs (Dict[str, str]) – 链输入的字典,包括链内存添加的任何输入。
outputs (Dict[str, str]) – 初始链输出的字典。
return_only_outputs (bool) – 是否仅返回链输出。如果为 False,输入也会添加到最终输出中。
- 返回值
最终链输出的字典。
- 返回类型
Dict[str, str]
- prompt_length(docs: List[Document], **kwargs: Any) Optional[int] ¶
返回给定传入文档的提示长度。
调用者可以使用此方法来确定传入文档列表是否会超出某个提示长度。这在尝试确保提示的大小保持在某个上下文限制以下时非常有用。
- 参数
docs (List[Document]) – List[Document],用于计算总提示长度的文档列表。
kwargs (Any) –
- 返回值
如果该方法不依赖于提示长度,则返回 None,否则返回提示的 token 长度。
- 返回类型
Optional[int]
- run(*args: Any, callbacks: Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any) Any ¶
Deprecated since version langchain==0.1.0: Use
invoke
instead.执行链的便捷方法。
此方法与 Chain.__call__ 之间的主要区别在于,此方法期望输入直接作为位置参数或关键字参数传入,而 Chain.__call__ 期望一个包含所有输入的单个输入字典
- 参数
*args (Any) – 如果链只接受一个输入,则可以作为唯一的位置参数传入。
callbacks (Optional[Union[List[BaseCallbackHandler], BaseCallbackManager]]) – 用于此链运行的回调。这些回调将添加到在构造期间传递给链的回调之外调用,但只有这些运行时回调才会传播到对其他对象的调用。
tags (Optional[List[str]]) – 要传递给所有回调的字符串标签列表。这些标签将添加到在构造期间传递给链的标签之外传递,但只有这些运行时标签才会传播到对其他对象的调用。
**kwargs (Any) – 如果链接受多个输入,则可以直接作为关键字参数传入。
metadata (Optional[Dict[str, Any]]) –
**kwargs –
- 返回值
链输出。
- 返回类型
Any
示例
# Suppose we have a single-input chain that takes a 'question' string: chain.run("What's the temperature in Boise, Idaho?") # -> "The temperature in Boise is..." # Suppose we have a multi-input chain that takes a 'question' string # and 'context' string: question = "What's the temperature in Boise, Idaho?" context = "Weather report for Boise, Idaho on 07/03/23..." chain.run(question=question, context=context) # -> "The temperature in Boise is..."
- save(file_path: Union[Path, str]) None ¶
保存链。
- 期望实现 Chain._chain_type 属性,并且内存为
空。
- 参数
file_path (Union[Path, str]) – 将链保存到的文件路径。
- 返回类型
None
示例
chain.save(file_path="path/chain.yaml")
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- Yields
Runnable 的输出。
- 返回类型
Iterator[Output]
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将 Runnable 序列化为 JSON。
- 返回值
Runnable 的 JSON 可序列化表示形式。
- 返回类型
- property collapse_document_chain: BaseCombineDocumentsChain¶
为向后兼容而保留。
- property combine_document_chain: BaseCombineDocumentsChain¶
为向后兼容而保留。