langchain_community.retrievers.docarray.DocArrayRetriever

说明

DocArrayRetriever 实现了标准 Runnable 接口。🏃

Runnable 接口 具有一些额外的在运行函数中使用的方法,例如 with_typeswith_retryassignbindget_graph 以及更多。

class langchain_community.retrievers.docarray.DocArrayRetriever[源代码]

基类: BaseRetriever

DocArray 文档索引器

目前,它支持 5 个后端:InMemoryExactNNIndex、HnswDocumentIndex、QdrantDocumentIndex、ElasticDocIndex 和 WeaviateDocumentIndex。

参数
  • index – 以上提到的索引实例之一

  • embeddings – 表示文本为向量的嵌入模型

  • search_field – 用于在文档中搜索的字段。应为嵌入/向量/张量。

  • content_field – 表示您文档架构中主要内容的字段。将用作 page_content。其他所有内容都将放入 metadata

  • search_type – 要执行的搜索类型(相似度 / mmr)

  • filters – 对文档检索应用过滤器。

  • top_k – 返回的文档数

参数 content_field: str [必需]
参数 embeddings: Embeddings [必需]
参数 : filters: 可选[Any] = None
参数 : index: Any = None
参数 : metadata: 可选[Dict[str, Any]] = None

可选与检索器相关的元数据。默认为None。此元数据将与每次检索调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。您可以将其用于识别具有特定用例的检索器的特定实例。

参数 : search_field: str [必需]
参数 : search_type: SearchType = SearchType.similarity
参数 : tags: 可选[List[str]] = None

可选与检索器相关的标签列表。默认为None。这些标签将与每次检索调用相关联,并作为参数传递给 callbacks 中定义的处理程序。您可以将其用于识别具有特定用例的检索器的特定实例。

参数 : top_k: int = 1
async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现通过asyncio.gather并行运行ainvoke。

默认批处理的实现适用于I/O密集型运行时。

子类如果能够更高效地批量处理,应重写此方法;例如,如果底层的Runnable使用支持批量模式的API。

参数
  • inputs (列表[输入]) - Runnable的输入列表。

  • config (可选[并集[RunnableConfig, 列表[RunnableConfig]]]) - 调用Runnable时使用的配置。配置支持标准键,如‘tags’、‘metadata’(用于跟踪目的)、‘max_concurrency’(用于控制并行执行的工作量),以及其他键。请参阅RunnableConfig获取更多信息。默认为None。

  • return_exceptions (布尔值) - 是否返回异常而不仅仅是抛出它们。默认为False。

  • kwargs (可选[任何]) - 要传递给Runnable的额外关键字参数。

返回

从Runnable返回的输出列表。

返回类型

列表[输出]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在输入列表上并行运行runnable,在它们完成时生成结果。

参数
  • inputs (Sequence[Input]) – 对runnable的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 启动runnable时使用的配置。配置支持标准键,如用于跟踪的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。请参阅RunnableConfig以获取更多详细信息。默认为None。

  • return_exceptions (布尔值) - 是否返回异常而不仅仅是抛出它们。默认为False。

  • kwargs (可选[任何]) - 要传递给Runnable的额外关键字参数。

生成结果

包含输入索引和从runnable得到的输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async aget_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

自langchain-core==0.1.46版本以来已弃用: 请使用 ainvoke 代替。

异步获取与查询相关的文档。

用户应优先使用 .ainvoke.abatch 而不是直接使用 aget_relevant_documents

参数
  • query (str) – 用于查找相关文档的字符串。

  • callbacks (Callbacks) – 回调管理器或回调列表。

  • tags (Optional[List[str]]) – 可选的与检索器关联的标签列表。这些标签将与对检索器的每次调用相关联,并作为参数传递到在 callbacks 中定义的处理程序。默认值为 None。

  • metadata (Optional[Dict[str, Any]]) – 可选的与检索器关联的元数据。此元数据将与对检索器的每次调用相关联,并作为参数传递到在 callbacks 中定义的处理程序。默认值为 None。

  • run_name (Optional[str]) – 可选的运行名称。默认值为 None。

  • kwargs (Any) – 要传递给检索器的附加参数。

返回

相关文档列表。

返回类型

List[Document]

async ainvoke(input: str, config: Optional[RunnableConfig], **kwargs: Any) List[Document]

异步调用检索器以获取相关文档。

异步检索器调用的主入口点。

参数
返回

相关文档列表。

返回类型

List[Document]

示例

await retriever.ainvoke("query")
as_tool(args_schema: Optional[Type[BaseModel]], *, name: Optional[str], description: Optional[str], arg_types: Optional[Dict[str, Type]]) BaseTool>

测试版

此 API 处于测试版,可能在未来发生变化。

从可运行对象创建 BaseTool。

as_tool 将使用名称、描述和从可运行的 args_schema 创建一个 `BaseTool`。尽可能从 runnable.get_input_schema 推断模式。或者,例如,如果可运行的对象接受字典作为输入且不太具体的字典键不是类型化的,则可以通过 args_schema 直接指定方案。您也可以传递 arg_types 来仅指定所需的参数及其类型。

参数
  • args_schema可选) - 工具的模式。默认为 None。

  • name可选) - 工具的名称。默认为 None。

  • description可选) - 工具的描述。默认为 None。

  • arg_types可选) - 参数名称到类型的字典。默认为 None。

返回

实例。

返回类型

带类型词典的输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

自版本 0.2.14 以来引入。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

默认的 astream 实现,它调用 。子类应重写此方法,如果它们支持流式输出。

参数
  • input输入) - 用于可运行的输入。

  • config可选) - 用于可运行的配置。默认为 None。

  • kwargs (可选[任何]) - 要传递给Runnable的额外关键字参数。

生成结果

可运行的输出。

返回类型

AsyncIterator输出

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

测试版

此 API 处于测试版,可能在未来发生变化。

生成一个事件流。

用于创建流事件迭代器,这些事件提供有关可运行操作的实时信息,包括中间结果中的流事件。

流事件是一个结构如下字典

  • event字符串 - 事件名称遵循以下格式

    格式:on_[runnable_type]_(start|stream|end)。

  • name字符串 - 生成事件的可运行对象的名称。

  • run_id字符串 - 与给定执行有关随机生成的 ID。

    引发事件的Runnable。作为父Runnable执行的一部分被调用的子Runnable被分配一个独一无二的ID。

  • parent_ids: 列表[str] - 产生事件的父Runnable的ID。

    根Runnable将有一个空的列表。父ID的顺序是从根到直接父级。只在API的v2版本中可用。API的v1版本将返回一个空列表。

  • tags: Optional[List[str]] - 产生事件的Runnable的标签。

  • metadata: Optional[Dict[str, Any]] - 产生事件的Runnable的元数据。

  • data: Dict[str, Any]

以下表格说明了一些可能由各种链引发的某些事件。出于简洁考虑,表中省略了元数据字段。链定义在表格之后。

注意 此参考表针对的是方案的V2版本。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[系统消息,人类消息]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content="hello")

on_chat_model_end

[模型名称]

{“messages”: [[系统消息,人类消息]]}

AIMessageChunk(content="hello world")

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

某些工具

{“x”: 1, “y”: “2”}

on_tool_end

某些工具

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件之外,用户还可以发出自定义事件(见以下示例)。

自定义事件只在API的V2版本中公开!

自定义事件具有以下格式

属性

类型

描述

名称

str

为事件定义的用户名称。

数据

Any

与事件关联的数据。这可以是任何东西,尽管我们建议使其具有JSON序列化能力。

以下是与上述标准事件相关的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

某些工具:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:发出自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable的输入。

  • config (Optional[RunnableConfig]) – 使用Runnable的配置。

  • version (Literal['v1', 'v2']) – 要使用的方案版本,要么是v2要么是v1。用户应使用v2v1是向后兼容的,将在0.4.0中弃用。直到API稳定,将不分配默认值。自定义事件仅在v2中公开。

  • include_names (Optional[Sequence[str]]) – 只包括具有匹配名称的runnables的事件。

  • include_types (Optional[Sequence[str]]) – 只包括具有匹配类型的runnables的事件。

  • include_tags (Optional[Sequence[str]]) – 只包括具有匹配标签的runnables的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除具有匹配名称的runnables的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除具有匹配类型的runnables的事件。

  • exclude_tags (可选[序列[str]]) – 排除与匹配标签的runnable中的事件。

  • kwargs (任何) – 将传递给Runnable的额外关键字参数。这些将通过astream_log传递,因为astream_events的实现是建立在astream_log之上的。

生成结果

异步流中的StreamEvents。

抛出异常

NotImplementedError – 如果版本不是v1v2

返回类型

AsyncIterator[联合体[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: 可选[联合体[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: 可选[任何]) List[Output]

默认实现使用线程池执行器并行运行invoke。

默认批处理的实现适用于I/O密集型运行时。

子类如果能够更高效地批量处理,应重写此方法;例如,如果底层的Runnable使用支持批量模式的API。

参数
  • inputs (列表[Input]) –

  • config (可选[联合体[RunnableConfig, 列表[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (可选[任何]) –

返回类型

列表[输出]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行列表中的调用,按完成顺序输出结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (可选[任何]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]) RunnableSerializable[Input, Output]

配置可以在运行时设置的 Runnables 的备选方案。

参数
  • which (ConfigurableField) – 将用于选择备选方案的 ConfigurableField 实例。

  • default_key (str) – 如果未选择备选方案,将使用的默认键。默认为“default”。

  • prefix_keys (bool) – 是否使用 ConfigurableField id 作为键的前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的 callables 的字典。

返回

配置了备选方案的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的可运行字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

返回

配置字段后的新的可运行对象。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

自langchain-core==0.1.46版本以来已弃用: 请使用invoke代替。

检索与查询相关的文档。

用户应优先使用.invoke.batch而不是直接使用get_relevant_documents

参数
  • query (str) – 用于查找相关文档的字符串。

  • 回调 (Callbacks) – 回调管理器或回调列表。默认值为None。

  • tags (Optional[List[str]]) – 可选的与检索器关联的标签列表。这些标签将与对检索器的每次调用相关联,并作为参数传递到在 callbacks 中定义的处理程序。默认值为 None。

  • metadata (Optional[Dict[str, Any]]) – 可选的与检索器关联的元数据。此元数据将与对检索器的每次调用相关联,并作为参数传递到在 callbacks 中定义的处理程序。默认值为 None。

  • run_name (Optional[str]) – 可选的运行名称。默认值为 None。

  • kwargs (Any) – 要传递给检索器的附加参数。

返回

相关文档列表。

返回类型

List[Document]

invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

调用检索器以获取相关文档。

同步检索调用主入口。

参数
返回

相关文档列表。

返回类型

List[Document]

示例

retriever.invoke("query")
stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream的默认实现,它会调用invoke。如果子类支持流式输出,应该重写此方法。

参数
  • input输入) - 用于可运行的输入。

  • config可选) - 用于可运行的配置。默认为 None。

  • kwargs (可选[任何]) - 要传递给Runnable的额外关键字参数。

生成结果

可运行的输出。

返回类型

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将Runnable序列化为JSON。

返回

Runnable的可序列化表示。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]

使用DocArrayRetriever的示例