langchain_community.retrievers.databerry.DataberryRetriever

注意

DataberryRetriever 实现了标准的 Runnable 接口。 🏃

Runnable 接口 具有在可运行对象上可用的附加方法,例如 with_types, with_retry, assign, bind, get_graph, 等等。

class langchain_community.retrievers.databerry.DataberryRetriever[source]

基类: BaseRetriever

Databerry API 检索器。

param api_key: Optional[str] = None
param datastore_url: str [必填]
param metadata: Optional[Dict[str, Any]] = None

与检索器关联的可选元数据。默认为 None。此元数据将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些来识别检索器的特定实例及其用例。

param tags: Optional[List[str]] = None

与检索器关联的可选标签列表。默认为 None。这些标签将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些来识别检索器的特定实例及其用例。

param top_k: Optional[int] = None
async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用 asyncio.gather 并行运行 ainvoke。

batch 的默认实现对于 IO 绑定的可运行对象效果良好。

如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) – 可运行对象的一系列输入。

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

返回值

来自 Runnable 的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

并行运行一系列输入的 ainvoke,并在结果完成时生成结果。

参数
  • inputs (Sequence[Input]) – 可运行对象的一系列输入。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,例如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。默认为 None。

  • return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

Yields

输入索引和来自 Runnable 的输出的元组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async aget_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

Deprecated since version langchain-core==0.1.46: 请使用 ainvoke 代替。

异步获取与查询相关的文档。

用户应优先使用 .ainvoke.abatch 而不是直接使用 aget_relevant_documents

参数
  • query (str) – 用于查找相关文档的字符串。

  • callbacks (Callbacks) – 回调管理器或回调列表。

  • tags (Optional[List[str]]) – 与检索器关联的可选标签列表。这些标签将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • metadata (Optional[Dict[str, Any]]) – 与检索器关联的可选元数据。此元数据将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • run_name (Optional[str]) – 运行的可选名称。默认为 None。

  • kwargs (Any) – 要传递给检索器的其他参数。

返回值

相关文档列表。

返回类型

List[Document]

async ainvoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

异步调用检索器以获取相关文档。

异步检索器调用的主要入口点。

参数
  • input (str) – 查询字符串。

  • config (Optional[RunnableConfig]) – 检索器的配置。默认为 None。

  • kwargs (Any) – 要传递给检索器的其他参数。

返回值

相关文档列表。

返回类型

List[Document]

示例

await retriever.ainvoke("query")
as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

Beta

此 API 处于 Beta 阶段,将来可能会发生更改。

从 Runnable 创建 BaseTool。

as_tool 将从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。 在可能的情况下,模式从 runnable.get_input_schema 推断。或者(例如,如果 Runnable 接受字典作为输入并且未键入特定的字典键),可以直接使用 args_schema 指定模式。您还可以传递 arg_types 以仅指定必需的参数及其类型。

参数
  • args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。

  • name (Optional[str]) – 工具的名称。默认为 None。

  • description (Optional[str]) – 工具的描述。默认为 None。

  • arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。

返回值

BaseTool 实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

0.2.14 版本新增。

async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,它调用 ainvoke。 如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

Beta

此 API 处于 Beta 阶段,将来可能会发生更改。

生成事件流。

用于创建一个 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。

StreamEvent 是一个具有以下模式的字典

  • event: str - 事件名称的格式为:

    on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 随机生成的 ID,与发出事件的 Runnable 的给定执行相关联。作为父 Runnable 执行一部分调用的子 Runnable 将被分配其自己唯一的 ID。

    the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.

  • parent_ids: List[str] - 生成事件的父 runnable 的 ID 列表。根 Runnable 将具有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。

    generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.

  • tags: Optional[List[str]] - 生成事件的 Runnable 的标签。

    the event.

  • metadata: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。

    that generated the event.

  • data: Dict[str, Any]

下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链定义已包含在表格之后。

注意 此参考表适用于模式的 V2 版本。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(content=”hello”)

on_chat_model_end

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[模型名称]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

format_docs

on_chain_stream

format_docs

“hello world!, goodbye world!”

on_chain_end

format_docs

[Document(…)]

“hello world!, goodbye world!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_end

[检索器名称]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[模板名称]

{“question”: “hello”}

on_prompt_end

[模板名称]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以分派自定义事件(请参阅下面的示例)。

自定义事件将仅在 API 的 v2 版本中显示!

自定义事件具有以下格式

属性

类型

描述

名称

str

用户为事件定义的名称。

data

Any

与事件关联的数据。这可以是任何内容,但我们建议使其 JSON 可序列化。

以下是与上面显示的标准事件相关的声明

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:分派自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • input (Any) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。

  • version (Literal['v1', 'v2']) – 要使用的模式版本,v2v1。用户应使用 v2v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。

  • include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnable 的事件。

  • include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnable 的事件。

  • include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnable 的事件。

  • exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnable 的事件。

  • exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnable 的事件。

  • exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnable 的事件。

  • kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为 astream_events 的此实现构建在 astream_log 之上。

Yields

StreamEvents 的异步流。

Raises

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

batch 的默认实现对于 IO 绑定的可运行对象效果良好。

如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批处理模式的 API。

参数
  • inputs (List[Input]) –

  • config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行运行列表中输入的 invoke,并在完成时生成结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (bool) –

  • kwargs (Optional[Any]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

配置可在运行时设置的 Runnable 的备选项。

参数
  • which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。

  • default_key (str) – 如果未选择任何备选项,则使用的默认键。默认为“default”。

  • prefix_keys (bool) – 是否使用 ConfigurableField id 作为键的前缀。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

返回值

配置了备选项的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的 Runnable 字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

返回值

配置了字段的新 Runnable。

返回类型

RunnableSerializable[Input, Output]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

Deprecated since version langchain-core==0.1.46: Deprecated: 自 langchain-core==0.1.46 版本起已弃用:请改用 invoke

检索与查询相关的文档。

用户应优先使用 .invoke.batch,而不是直接使用 get_relevant_documents

参数
  • query (str) – 用于查找相关文档的字符串。

  • callbacks (Callbacks) – 回调管理器或回调列表。默认为 None。

  • tags (Optional[List[str]]) – 与检索器关联的可选标签列表。这些标签将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • metadata (Optional[Dict[str, Any]]) – 与检索器关联的可选元数据。此元数据将与对该检索器的每次调用关联,并作为参数传递给 callbacks 中定义的处理程序。默认为 None。

  • run_name (Optional[str]) – 运行的可选名称。默认为 None。

  • kwargs (Any) – 要传递给检索器的其他参数。

返回值

相关文档列表。

返回类型

List[Document]

invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

调用检索器以获取相关文档。

同步检索器调用的主要入口点。

参数
  • input (str) – 查询字符串。

  • config (Optional[RunnableConfig]) – 检索器的配置。默认为 None。

  • kwargs (Any) – 要传递给检索器的其他参数。

返回值

相关文档列表。

返回类型

List[Document]

示例

retriever.invoke("query")
stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。

参数
  • input (Input) – Runnable 的输入。

  • config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。

  • kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。

Yields

Runnable 的输出。

返回类型

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回值

Runnable 的 JSON 可序列化表示形式。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]