langchain_community.retrievers.zep_cloud.ZepCloudRetriever

注意

gradedRetriever 实现 Runnable Interface。🏃

Runnable Interface 有一些额外的可在运行进程上使用的方法,例如 with_typeswith_retryassignbindget_graph 等。

classlangchain_community.retrievers.zep_cloud.ZepCloudRetriever[source]

基类: BaseRetriever

Ze p Cloud 内存存储检索器。

使用 Zep 搜索您用户的长久聊天历史。

Zep 提供了简单语义搜索和最大边际相关度 (MMR) 搜索结果重排序。

注意:使用此检索器时,您需要提供用户的 session_id

参数
  • api_key – 您的 Zep API 密钥

  • session_id – 识别您的用户或用户会话(必需)

  • top_k – 返回的文档数量(默认:3,可选)

  • search_type – 执行的搜索类型(相似度 / mmr)(默认:相似度,可选)

  • mmr_lambda – MMR 搜索的 Lambda 值。默认为 0.5(可选)

Ze p - 从聊天历史中召回、理解和提取数据。打造个人 AI 体验。 =========== Ze p 是 AI 助手应用的长期记忆服务。使用 Ze p,您可以向 AI 助手提供召回过去对话的能力,无论对话多么久远,同时减少虚构、延迟和成本。

参阅 Ze p Cloud 文档: https://help.getzep.com

paramapi_key: str [Required]

您的 Zep API 密钥。

参数 metadata: 可选[Dict[str, Any]] = None

与检索器关联的可选元数据。默认为 None。这些元数据将与对检索器的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。您可以使用这些信息识别具有特定用例的特定检索器的实例。

参数 mmr_lambda: 可选[float] = None

MMR 搜索的 Lambda 值。

参数 search_scope : SearchScope = 'messages'

搜索哪些文档。消息或摘要?

参数 search_type : SearchType = 'similarity'

执行搜索的类型(相似度/ mmr)

参数 session_id : str [必需]

Zep 会话 ID。

参数 tags : 可选[List[str]] = None

与检索器关联的可选标签列表。默认为 None。这些标签将与每次调用检索器的调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。您可以使用这些信息识别具有特定用例的特定检索器的实例。

参数 top_k: Optional[int] = None

返回项目数量。

参数 zep_client: Zep [必需]

用于进行API请求的Zep客户端。

参数 zep_client_async: AsyncZep [必需]

用于进行API请求的异步Zep客户端。

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用asyncio.gather并行运行ainvoke。

批处理的默认实现适用于I/O密集型运行程序。

如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层Runnables使用支持批处理模式的API。

参数
  • inputs (列表[Input]) – Runnable的输入列表。

  • config (可选UnionRunnableConfigListRunnableConfig[]]) – 调用Runnable时使用的配置。配置支持标准键,如‘tags’、‘metadata’用于跟踪目的,‘max_concurrency’用于控制并行工作量的多少,以及其他键。请参阅RunnableConfig获取更多详细信息。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是引发它们。默认为False。

  • kwargs (可选Any[]) – 传递给Runnable的额外关键字参数。

返回:

Runnable的输出列表。

返回类型

ListOutput

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union: RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union: Output, Exception]]

并行在输入列表上运行ainvoke,按完成顺序产生结果。

参数
  • inputs (SequenceInput[]) – Runnable的输入列表。

  • config (可选UnionRunnableConfigSequenceRunnableConfig[]]) – 调用Runnable时使用的配置。配置支持标准键,如‘tags’、‘metadata’用于跟踪目的,‘max_concurrency’用于控制并行工作量的多少,以及其他键。请参阅RunnableConfig获取更多详细信息。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是引发它们。默认为False。

  • kwargs (可选Any[]) – 传递给Runnable的额外关键字参数。

产生:

一个包含输入索引和Runnable输出值的元组。

返回类型

异步迭代器[元组[int, 联合[输出, 异常]]]

async aget_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

自 langchain-core==0.1.46 版本以来已弃用: 请使用 ainvoke 代替。

异步获取与查询相关的文档。

用户应优先使用 .ainvoke.abatch 而不是直接使用 aget_relevant_documents

参数
  • 查询 (str) – 要查找相关文档的字符串。

  • callbacks (Callbacks) – 回调管理器或回调列表。

  • tags (可选[列表[str]]) – 可选的与检索器关联的标签列表。这些标签将与每次对该检索器的调用相关联,并将作为参数传递给在 callbacks 中定义的处理程序。默认值为 None。

  • metadata (可选[字典[str, Any]]) – 可选的与检索器关联的元数据。这些元数据将与每次对该检索器的调用相关联,并将作为参数传递给在 callbacks 中定义的处理程序。默认值为 None。

  • run_name (可选[str]) – 可选的运行名称。默认值为 None。

  • kwargs (Any) – 传递给检索器的额外参数。

返回:

相关文档列表。

返回类型

Document 列表

async ainvoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

异步调用检索器以获取相关文档。

异步检索器调用的主要入口点。

参数
  • 输入 (字符串) – 查询字符串。

  • 配置 (可选[RunnableConfig]) – 检索器的配置。默认为 None。

  • kwargs (Any) – 传递给检索器的额外参数。

返回:

相关文档列表。

返回类型

列表[Document]

示例

await retriever.ainvoke("query")
as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool

测试版

此 API 处于测试版,可能会在未来发生变化。

从可运行的程序创建 BaseTool。

as_tool 从 Runnable 实例化一个具有名称、描述和 args_schema 的 BaseTool。可能的情况下,模式从 runnable.get_input_schema 推断而来。或者(例如,如果 Runnable 接收一个字典作为输入,且特定的字典键没有类型),可以通过 args_schema 直接指定模式。您还可以传递 arg_types 以仅指定所需参数及其类型。

参数
  • args_schema可选)- 工具的模式。默认为 None。

  • name可选)- 工具的名称。默认为 None。

  • description可选)- 工具的描述。默认为 None。

  • arg_types可选)- 参数名称到类型的字典。默认为 None。

返回:

BaseTool 实例。

返回类型

BaseTool

类型化的字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 args_schema 指定模式

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict 输入,通过 arg_types 指定模式

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

新增于版本 0.2.14。

async astream(input: Input, config: Optional[RunnableConfig], **kwargs: Optional[Any]) AsyncIterator[Output]

astream 的默认实现,调用 ainvoke。子类应该覆盖此方法以支持流式输出。

参数
  • inputInput)- Runnable 的输入。

  • config可选)- 为 Runnable使用的配置。默认为 None。

  • kwargs (可选Any[]) – 传递给Runnable的额外关键字参数。

产生:

Runnable 的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

测试版

此 API 处于测试版,可能会在未来发生变化。

生成事件流。

用于创建一个遍历提供实时运行过程信息的 StreamEvents 的迭代器,包括中间结果生成的 StreamEvents。

StreamEvent 是一个具有以下模式的字典

  • eventstr - 事件名称遵循以下格式:


  • namestr - 生成事件的 Runnable 的名称。

  • run_id字符串 - 与给定执行关联的随机生成的ID。

    emitting the event的Runnable相关联。作为父Runnable执行部分被调用执行的子Runnable会分配自己的唯一ID。

  • parent_ids字符串列表 - 生成事件的父Runnable的ID。根Runnable将有一个空列表。父ID的顺序是从根到直接父级。仅适用于API的v2版本。API v1版本将返回一个空列表。

    tags可选的字符串列表 - 发生事件的Runnable的标签。

  • metadata可选的字典 - 发生事件的Runnable的元数据。

    data字典

  • 以下是一个表格,说明了各种链可能发出的某些事件。出于简洁考虑,省略了元数据字段。链定义在表格之后。

    注意:此参考表是为V2版本的Schema。

  • 事件

名称

输入

输出

on_chat_model_start

【模型名称】

{"messages": [系统消息,人类消息]}

on_chat_model_stream

AIMessageChunk(content="hello")

on_chat_model_end

AIMessageChunk(content="hello world")

AIMessageChunk(content="hello")

on_llm_start

{‘input’: ‘hello’}

AIMessageChunk(content="hello")

on_chat_model_end

on_llm_stream

‘Hello’

AIMessageChunk(content="hello")

on_llm_end

‘Hello human!’

AIMessageChunk(content="hello")

on_chain_start

format_docs

AIMessageChunk(content="hello")

on_chain_stream

“hello world!, goodbye world!”

on_chain_end

[文档(…)]

on_chain_end

on_tool_start

some_tool

on_chain_end

{“x”: 1, “y”: “2”}

on_tool_start

on_tool_end

on_retriever_start

[检索器名称]

{“query”: “hello”}

on_retriever_start

[检索器名称]

on_retriever_end

[Document(…), ..]

on_prompt_start

[模板名称]

[Document(…), ..]

on_prompt_start

{“question”: “hello”}

on_prompt_end

ChatPromptValue(messages: [系统消息, …])

除了标准事件外,用户还可以发布自定义事件(见以下示例)。

自定义事件将在API的$v2版本中显示出来!

ChatPromptValue(messages: [系统消息, …])

除了标准事件外,用户还可以发布自定义事件(见以下示例)。

自定义事件具有以下格式

属性

类型

描述

字符串

用于事件的用户定义名称。

数据

输出

与事件关联的数据。这可以是一切事物,尽管我们建议将其制作成可序列化为JSON的形式。

以下是与上述标准事件相关的声明

提示

示例

示例:发布自定义事件

input

on_chain_end:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

on_retriever_start:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

(任何)– Runnable的输入。:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

config

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

(任何:可选RunnableConfig)– 要用于Runnable的配置。

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • version

  • (字面量:’v1’、’v2’)– 要使用的Schema版本,可以是$v2或$v1。用户应使用$v2。$v1用于向后兼容,将在0.4.0中弃用。在API稳定之前将不会分配默认值。自定义事件只能在$v2中显示出来。

  • include_names

  • (任何:可选字符串序列)– 仅包含具有匹配名称的runnables事件。

  • include_types

  • (任何:可选字符串序列)– 仅包含具有匹配类型的runnables事件。

  • include_tags

  • exclude_types (可选[字符串序列]]) – 排除与匹配类型匹配的可执行任务的事件。

  • exclude_tags (可选[字符串序列]]) – 排除与匹配标签匹配的可执行任务的事件。

  • kwargs (任意类型) – 传递给 Runnable 的额外关键字参数。这些参数将传递给 astream_log,因为 astream_events 的实现建立在 astream_log 之上。

产生:

异步流程式的 StreamEvents。

引发

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[联合[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: 输入列表[Input], config: 可选[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: 可选[任意类型]) 输出列表[Output]

默认实现使用线程池执行器并行运行调用。

批处理的默认实现适用于I/O密集型运行程序。

如果子类可以更有效地进行批处理,则应覆盖此方法;例如,如果底层Runnables使用支持批处理模式的API。

参数
  • inputs (输入列表[Input]) –

  • config (可选[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (可选[任意类型]) –

返回类型

ListOutput

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行在输入列表上运行调用,按完成顺序返回结果。

参数
  • inputs (Sequence[Input]) –

  • config (可选[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (可选[任意类型]) –

返回类型

迭代器[元组[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output]

配置可运行时设置的运行对象的替代方案。

参数
  • which (ConfigurableField) – 使用 ConfigurableField 实例来选择替代方案。

  • default_key (str) – 如果未选择替代方案,则使用默认键。默认为“default”。

  • prefix_keys (bool) – 是否使用 ConfigurableField ID 前缀键。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 一个键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。

返回:

配置替代方案的新 Runnable。

返回类型

RunnableSerializable[输入, 输出]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的Runnable字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) — 要配置的ConfigurableField实例的字典。

返回:

配置字段后的新Runnable。

返回类型

RunnableSerializable[输入, 输出]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document]

自从 langchain-core==0.1.46 版本以来已弃用: 请使用 invoke 代替。

检索与查询相关的文档。

用户应优先使用 .invoke.batch 而不是直接使用 get_relevant_documents

参数
  • 查询 (str) – 要查找相关文档的字符串。

  • callbacks (回调函数)- 回调管理器或回调函数列表。默认值为 None。

  • tags (可选[列表[str]]) – 可选的与检索器关联的标签列表。这些标签将与每次对该检索器的调用相关联,并将作为参数传递给在 callbacks 中定义的处理程序。默认值为 None。

  • metadata (可选[字典[str, Any]]) – 可选的与检索器关联的元数据。这些元数据将与每次对该检索器的调用相关联,并将作为参数传递给在 callbacks 中定义的处理程序。默认值为 None。

  • run_name (可选[str]) – 可选的运行名称。默认值为 None。

  • kwargs (Any) – 传递给检索器的额外参数。

返回:

相关文档列表。

返回类型

Document 列表

invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]

调用检索器以获取相关文档。

同步检索器调用的主入口点。

参数
  • 输入 (字符串) – 查询字符串。

  • 配置 (可选[RunnableConfig]) – 检索器的配置。默认为 None。

  • kwargs (Any) – 传递给检索器的额外参数。

返回:

相关文档列表。

返回类型

列表[Document]

示例

retriever.invoke("query")
stream(输入: Input, 配置: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream 的默认实现,调用 invoke。子类如果支持流式输出,应重写此方法。

参数
  • inputInput)- Runnable 的输入。

  • config可选)- 为 Runnable使用的配置。默认为 None。

  • kwargs (可选Any[]) – 传递给Runnable的额外关键字参数。

产生:

Runnable 的输出。

返回类型

输出

to_json() 联合体[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回:

Runnable 的可序列化表示。

返回类型

联合体[SerializedConstructor, SerializedNotImplemented]

使用 ZepCloudRetriever 的示例