langchain_elasticsearch.retrievers.ElasticsearchRetriever

注意

ElasticsearchRetriever 实现了标准的 Runnable 接口。 🏃

Runnable 接口 在 runnables 上还有其他可用的方法,例如 with_types, with_retry, assign, bind, get_graph, 等等。

class langchain_elasticsearch.retrievers.ElasticsearchRetriever[source]

基类: BaseRetriever

Elasticsearch 检索器

参数
  • es_client – Elasticsearch 客户端连接。或者,您可以使用带有参数的 from_es_params 方法来初始化客户端。

  • index_name – 要查询的索引的名称。也可以是名称列表。

  • body_func – 从搜索字符串创建 Elasticsearch DSL 查询主体的函数。返回的查询主体必须符合您通常在 POST 请求中发送到 _search 端点的内容。如果适用,它还包括参数,如 size 参数等。

  • content_field – 包含页面内容的文档字段名称。如果查询多个索引,请在此处指定字典 {index_name: field_name}。

  • document_mapper – 将 Elasticsearch 命中映射到 LangChain 文档的函数。

param body_func: Callable[[str], Dict] [必需]
param content_field: Optional[Union[str, Mapping[str, str]]] = None
param document_mapper: Optional[Callable[[Mapping], Document]] = None
param es_client: Elasticsearch [必需]
param index_name: Union[str, Sequence[str]] [必需]
param metadata: Optional[Dict[str, Any]] = None

与检索器关联的可选元数据。默认为 None。此元数据将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些元数据来识别检索器的特定实例及其用例。

param tags: Optional[List[str]] = None

与检索器关联的可选标签列表。默认为 None。这些标签将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些标签来识别检索器的特定实例及其用例。

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

批处理的默认实现对于 IO 密集型 runnables 工作良好。

如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

返回

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在输入列表上并行运行 ainvoke,并在结果完成时产生结果。

参数
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

输入的索引和来自 Runnable 的输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async aget_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

Deprecated since version langchain-core==0.1.46: 自 langchain-core==0.1.46 版本起已弃用:请改用 ainvoke

异步获取与查询相关的文档。

用户应倾向于使用 .ainvoke.abatch 而不是直接使用 aget_relevant_documents

参数
  • query (str) – 用于查找相关文档的字符串。

  • callbacks (Callbacks) – 回调管理器或回调列表。

  • tags (Optional[List[str]]) – 与检索器关联的可选标签列表。这些标签将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • metadata (Optional[Dict[str, Any]]) – 与检索器关联的可选元数据。此元数据将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • run_name (Optional[str]) – 运行的可选名称。默认为 None。

  • kwargs (Any) – 传递给检索器的其他参数。

返回

相关文档列表。

返回类型

List[Document]

async ainvoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

异步调用检索器以获取相关文档。

异步检索器调用的主要入口点。

参数
  • input (str) – 查询字符串。

  • config (Optional[RunnableConfig]) – 检索器的配置。默认为 None。

  • kwargs (Any) – 传递给检索器的其他参数。

返回

相关文档列表。

返回类型

List[Document]

示例

await retriever.ainvoke("query")
as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,将来可能会发生变化。

从 Runnable 创建 BaseTool。

as_tool 将从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。 在可能的情况下,模式从 runnable.get_input_schema 推断。 或者(例如,如果 Runnable 接受 dict 作为输入且未键入特定的 dict 键),则可以直接使用 args_schema 指定模式。 您还可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。

  • name (Optional[str]) – 工具的名称。默认为 None。

  • description (Optional[str]) – 工具的描述。默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。

返回

一个 BaseTool 实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本新增。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,将来可能会发生变化。

生成事件流。

用于创建一个迭代器,遍历 StreamEvents,这些事件提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个字典,具有以下模式

  • event: str - 事件名称的格式为:

    格式:on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与给定 Runnable 执行关联的随机生成的 ID,该 Runnable 发出事件。作为父 Runnable 执行一部分被调用的子 Runnable 将被分配其自己的唯一 ID。

    Runnable的执行,该Runnable发出了事件。作为父Runnable执行一部分被调用的子Runnable将被分配其自己的唯一ID。

  • parent_ids: List[str] - 生成事件的父 runnables 的 ID。根 Runnable 将具有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

    generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    the event.

  • metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。

    that generated the event.

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链的定义已包含在表格之后。

注意 此参考表适用于 V2 版本的模式。

event

name

chunk

input

output

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件之外,用户还可以分派自定义事件(请参阅下面的示例)。

自定义事件将仅在 API 的 v2 版本中显示!

自定义事件具有以下格式

属性

类型

描述

name

str

用户定义的事件名称。

data

Any

与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。

以下是与上面显示的标准事件关联的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

prompt:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:分派自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包含来自具有匹配名称的 runnables 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包含来自具有匹配类型的 runnables 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包含来自具有匹配标签的 runnables 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为此 astream_events 的实现是构建在 astream_log 之上的。

产生

StreamEvents 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2,则引发此错误。

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

batch 的默认实现使用线程池执行器并行运行 invoke。

批处理的默认实现对于 IO 密集型 runnables 工作良好。

如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行 invoke 在输入列表上,并在完成时产生结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

配置可在运行时设置的 Runnables 的备选项。

参数
  • which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。

  • default_key (str) – 如果未选择备选项,则使用的默认键。默认为 “default”。

  • prefix_keys (bool) – 是否使用 ConfigurableField id 作为键的前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

返回

配置了备选项的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

配置运行时特定的 Runnable 字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

返回

配置了字段的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
static from_es_params(index_name: Union[str, Sequence[str]], body_func: Callable[[str], Dict], content_field: Optional[Union[str, Mapping[str, str]]] = None, document_mapper: Optional[Callable[[Mapping], Document]] = None, url: Optional[str] = None, cloud_id: Optional[str] = None, api_key: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, params: Optional[Dict[str, Any]] = None) ElasticsearchRetriever[source]
参数
  • index_name (Union[str, Sequence[str]]) –

  • body_func (Callable[[str], Dict]) –

  • content_field (Optional[Union[str, Mapping[str, str]]]) –

  • document_mapper (Optional[Callable[[Mapping], Document]]) –

  • url (Optional[str]) –

  • cloud_id (Optional[str]) –

  • api_key (Optional[str]) –

  • username (可选[str]) –

  • password (可选[str]) –

  • params (可选[Dict[str, Any]]) –

返回类型

ElasticsearchRetriever

get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

Deprecated since version langchain-core==0.1.46: 自 langchain-core==0.1.46 版本起已弃用:请使用 invoke 代替。

检索与查询相关的文档。

用户应优先使用 .invoke.batch,而不是直接使用 get_relevant_documents

参数
  • query (str) – 用于查找相关文档的字符串。

  • callbacks (Callbacks) – 回调管理器或回调列表。默认为 None。

  • tags (Optional[List[str]]) – 与检索器关联的可选标签列表。这些标签将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • metadata (Optional[Dict[str, Any]]) – 与检索器关联的可选元数据。此元数据将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • run_name (Optional[str]) – 运行的可选名称。默认为 None。

  • kwargs (Any) – 传递给检索器的其他参数。

返回

相关文档列表。

返回类型

List[Document]

invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

调用检索器以获取相关文档。

同步检索器调用的主要入口点。

参数
  • input (str) – 查询字符串。

  • config (Optional[RunnableConfig]) – 检索器的配置。默认为 None。

  • kwargs (Any) – 传递给检索器的其他参数。

返回

相关文档列表。

返回类型

List[Document]

示例

retriever.invoke("query")
stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream 的默认实现,它会调用 invoke。如果子类支持流式输出,则应重写此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

产生

Runnable 的输出。

返回类型

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回

Runnable 的 JSON 可序列化表示形式。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]