langchain_core.indexing.in_memory
.InMemoryDocumentIndex¶
注意
内存文档索引(InMemoryDocumentIndex)实现了标准 Runnable Interface
。🏃
Runnable Interface
还具有额外的运行时方法,例如 with_types
、with_retry
、assign
、bind
、get_graph
以及更多。
- class langchain_core.indexing.in_memory.InMemoryDocumentIndex[source]¶
基类:
DocumentIndex
beta版本
自版本0.2.29引入。底层抽象可能会变更。
内存文档索引。
这是一个内存文档索引,将文档存储在字典中。
它提供了一个简单的搜索API,根据给定查询在文档中出现的次数返回文档。
自版本0.2.29起引入。
- 参数 metadata: Optional[Dict[str, Any]] = None¶
与检索器相关联的可选元数据,默认为 None。此元数据将与每个检索器调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。您可以使用这些来识别检索器的特定实例及其用例。
- param tags: Optional[List[str]] = None¶
一个与检索器关联的可选列表标志。默认为None。这些标签将与对检索器的每次调用相关联,并作为参数传递到在callbacks中定义的处理程序。您可以使用这些标签来识别具有特定用例的检索器实例。
- param top_k : int = 4¶
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用异步并行运行的asyncio.gather执行runinvokes。
默认批次实现对于IO密集型runnables来说效果很好。
如果子类可以更有效地批处理,则应重写此方法;例如,如果底层Runnable使用支持批处理模式的API。
- 参数
inputs (List[Input]) – Runnable的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 在调用Runnable时使用的配置。配置支持标准键,如‘tags’、‘metadata’用于跟踪目的,‘max_concurrency’用于控制并行执行的工作量,以及其他键。请参阅RunnableConfig以获取更多详细信息。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为False。
kwargs (可选[任意类型]) – 将传递给Runnable的额外关键字参数。
- 返回
Runnable的输出列表。
- 返回类型
列表[输出]
- async abatch_as_completed(inputs: 序列[输入], config: 可选[联合[RunnableConfig, 序列[RunnableConfig]]]], *, return_exceptions: bool = False, **kwargs: 可选[任意类型]) 异步迭代器[元组[int, 联合[输出, Exception]]]¶
并行在输入列表上运行Runnable,并在结果完成时产生结果。
- 参数
inputs (序列[输入]) – Runnable的输入列表。
config (可选[联合[RunnableConfig, 序列[RunnableConfig]]) – 在调用Runnable时使用的配置。配置支持像‘tags’、‘metadata’这样的标准键,用于跟踪目的,以及控制并行工作量的‘max_concurrency’键和其他键。有关更多信息,请参阅RunnableConfig。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为False。
kwargs (可选[任意类型]) – 将传递给Runnable的额外关键字参数。
- 产生
一个由输入索引和Runnable的输出组成的元组。
- 返回类型
异步迭代器[元组[int, 联合[输出, Exception]]]
- async adelete(ids: Optional[List[str]] = None, **kwargs: Any) DeleteResponse ¶
根据ID或其他标准删除。异步版本。
在没有任何输入参数的情况下调用adelete应该引发ValueError!
- 参数
ids (Optional[List[str]]) – 要删除的ID列表。
kwargs (Any) – 其他关键字参数。这完全取决于实现。例如,可以包括一个删除整个索引的选项。
- 返回
一个包含成功删除的ID列表和未删除的ID列表的响应对象。
- 返回类型
- async aget(ids: Sequence[str], /, **kwargs: Any) List[Document] ¶
通过ID获取文档。
如果某些ID找不到或存在重复的ID,返回的文档数量可能少于请求的数量。
用户不应假设返回的文档的顺序与输入ID的顺序相同。相反,用户应依赖返回文档的ID字段。
如果对于某些ID找不到文档,此方法**不应**引发异常。
- 参数
ids (Sequence[str]) – 要获取的ID列表。
kwargs (Any) – 其他关键字参数。这些取决于实现。
- 返回
找到的文档列表。
- 返回类型
List[Document]
- 异步 aget_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document] ¶
自langchain-core==0.1.46版本开始弃用: 请使用
ainvoke
代替。异步获取与查询相关的文档。
用户应优先使用.ainvoke或.abatch,而不是直接使用aget_relevant_documents。
- 参数
query (str) – 需要查找相关文档的字符串。
callbacks (Callbacks) – 回调管理器或回调列表。
tags (可选[List[str]]) – 与检索器关联的可选标签列表。这些标签将与每次对检索器的调用相关联,并作为参数传递给在callbacks中定义的处理程序。默认为None。
metadata (可选[Dict[str, Any]]) – 与检索器关联的可选元数据。此元数据将与每次对检索器的调用相关联,并作为参数传递给在callbacks中定义的处理程序。默认为None。
run_name (可选[str]) – 运行的可选名称。默认为None。
kwargs (Any) – 传递给检索器的其他参数。
- 返回
相关文档的列表。
- 返回类型
List[Document]
- async ainvoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document]¶
异步调用检索器以获取相关文档。
异步检索调用的主要入口点。
- 参数
input (str) – 查询字符串。
config (Optional[RunnableConfig]) – 检索器的配置。默认为 None。
kwargs (Any) – 传递给检索器的其他参数。
- 返回
相关文档的列表。
- 返回类型
List[Document]
示例
await retriever.ainvoke("query")
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
beta版本
此API处于测试阶段,未来可能会发生变化。
从可运行项创建BaseTool。
as_tool
将从Runnable中实例化一个具有名称、描述和args_schema
的BaseTool。在可能的情况下,架构将从runnable.get_input_schema
推断。或者(例如,如果Runnable接受字典作为输入且具体的字典键未类型化),可以通过args_schema
直接指定架构。您还可以传递arg_types
来仅指定所需的参数及其类型。- 参数
args_schema (可选) – 工具的架构。默认为None。
name (可选) – 工具的名称。默认为None。
description (可选) – 工具的描述。默认为None。
arg_types (可选) – 参数名称到类型的字典。默认为None。
- 返回
一个BaseTool实例。
- 返回类型
类型化的字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
使用
args_schema
指定架构的字典输入from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
使用
arg_types
指定架构的字典输入from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
自版本0.2.14起引入。
- async astream(input: Input, config: Optional[RunnableConfig], **kwargs: Optional[Any]) AsyncIterator[Output]¶
astream的默认实现,它调用ainvoke。子类应覆盖此方法以支持流输出。
- 参数
input – 可执行部分的输入。
config (可选) – 使用可执行部分时的配置。默认为None。
kwargs (可选[任意类型]) – 将传递给Runnable的额外关键字参数。
- 产生
可执行部分的输出。
- 返回类型
AsyncIterator[输出]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
beta版本
此API处于测试阶段,未来可能会发生变化。
生成事件流。
用于创建StreamEvents的迭代器,这些事件提供了有关Runnable进度的实时信息,包括来自中间结果的事件。
StreamEvent是一个具有以下架构的字典
event
: str - 事件名称遵循以下格式:on_[runnable_type]_(start|stream|end)。
name
: str – 生成事件的Runnable的名称。run_id
: 字符串 - 与给定执行的 Runnable 关联的随机生成的 ID。触发事件的 Runnable 的子 Runnable 将获得其唯一的 ID。
parent_ids
: 字符串列表 - 生成事件的父 Runnable 的 ID。根 Runnable 将有一个空列表。父 ID 的顺序是从根到直接父级。对于 API 的 v2 版本,才提供此信息。v1 版本的 API 将返回空列表。
tags
: 可选[字符串列表] - 生成事件的 Runnable 的标签。
metadata
: 可选[字典[str, Any] - 生成事件的 Runnable 的元数据。
data
: 字典[str, Any]
以下是一个表格,展示了各种链可能发出的某些事件。为简洁起见,省略了元数据字段。链定义在表格之后。
注意 本参考表适用于模式的 V2 版本。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“消息”: [[系统消息, 人类消息]]}
on_chat_model_stream
[模型名称]
A IMessageChunk(content="hello")
on_chat_model_end
[模型名称]
{“消息”: [[系统消息, 人类消息]]}
A IMessageChunk(content="hello world")
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[检索器名称]
{“query”: “hello”}
on_retriever_end
[检索器名称]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{“问题”: “hello”}
on_prompt_end
[模板名称]
{“问题”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以分发自定义事件(请参阅以下示例)。
自定义事件仅在 API 的 v2 版本中显示!
自定义事件具有以下格式
属性
类型
描述
名称
字符串
事件的用户定义名称。
数据
Any
与事件关联的数据。这可以是一切,但我们建议将其制成可序列化的 JSON。
以下是上述标准事件的关联声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分发自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 为 Runnable 使用的配置。
version (文字['v1', 'v2']) – 要使用的模式版本,可以是 v2 或 v1。用户应使用 v2。 v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前不会分配默认值。自定义事件仅在 v2 中显示。
include_names (Optional[序列[字符串]]) – 仅包括具有匹配名称的 runnables 的事件。
include_types (Optional[序列[字符串]]) – 仅包括具有匹配类型的 runnables 的事件。
include_tags (Optional[序列[字符串]]) – 仅包括具有匹配标签的 runnables 的事件。
exclude_names (可选[序列[字符串]]) – 从具有匹配名称的runnable中排除事件。
exclude_types (可选[序列[字符串]]) – 从具有匹配类型的runnable中排除事件。
exclude_tags (可选[序列[字符串]]) – 从具有匹配标签的runnable中排除事件。
kwargs (任何) – 传递给Runnable的额外关键字参数。这将传递给astream_log,因为astream_events的此实现建立在astream_log之上。
- 产生
异步流中的StreamEvent。
- 抛出
NotImplementedError – 如果版本不是v1或v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- async aupsert(items: Sequence[Document], /, **kwargs: Any) UpsertResponse ¶
向vectorstore添加或更新文档。异步版本的upsert。
upsert功能应利用项目的ID字段(如果提供)。如果未提供ID,upsert方法可以自由地为项目生成ID。
当指定ID且项目已存在于vectorstore中时,upsert方法应使用新数据更新项目。如果项目不存在,upsert方法应将项目添加到vectorstore中。
- 参数
items (序列[Document]) – 要添加到vectorstore中的文档序列。
**kwargs (任何) – 额外关键字参数。
- 返回
一个响应对象,其中包含成功添加或更新到vectorstore的ID列表以及未能添加或更新到vectorstore的ID列表。
- 返回类型
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行调用。
默认批次实现对于IO密集型runnables来说效果很好。
如果子类可以更有效地批处理,则应重写此方法;例如,如果底层Runnable使用支持批处理模式的API。
- 参数
inputs (列表[输入]) –
config (可选[联合[RunnableConfig, 列表[RunnableConfig]]]) –
return_exceptions (布尔型) –
kwargs (可选[任何类型]) –
- 返回类型
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行在输入列表上运行调用,按完成顺序产生结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (布尔型) –
kwargs (可选[任何类型]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]) RunnableSerializable[Input, Output] ¶
配置在运行时可以设置的Runnables的备选方案。
- 参数
which (ConfigurableField) – 将用于选择备选方案的ConfigurableField实例。
default_key (str) – 如果未选择备选方案,则使用的默认键。默认为“默认”。
prefix_keys (bool) – 是否在键前加ConfigurableField id的前缀。默认为False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到Runnable实例或返回Runnable实例的Callable的字典。
- 返回
配置了备选方案的新Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]¶
在运行时设置特定的可运行字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。
- 返回
配置字段后的新可运行对象。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- delete(ids: Optional[List[str]] = None, **kwargs: Any) DeleteResponse [source]¶
通过 ID 进行删除。
- 参数
ids (Optional[List[str]]) –
kwargs (Any) –
- 返回类型
- get(ids: Sequence[str], /, **kwargs: Any) List[Document] [source]¶
通过 ids 获取。
- 参数
ids (Sequence[str]) –
kwargs (Any) –
- 返回类型
List[Document]
- get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document] ¶
自langchain-core==0.1.46版本以来已弃用: 请使用
invoke
代替。检索与查询相关的文档。
用户应优先使用 .invoke 或 .batch 而不是直接使用 get_relevant_documents。
- 参数
query (str) – 需要查找相关文档的字符串。
callbacks (Callbacks) – 回调管理器或回调列表。默认为 None。
tags (可选[List[str]]) – 与检索器关联的可选标签列表。这些标签将与每次对检索器的调用相关联,并作为参数传递给在callbacks中定义的处理程序。默认为None。
metadata (可选[Dict[str, Any]]) – 与检索器关联的可选元数据。此元数据将与每次对检索器的调用相关联,并作为参数传递给在callbacks中定义的处理程序。默认为None。
run_name (可选[str]) – 运行的可选名称。默认为None。
kwargs (Any) – 传递给检索器的其他参数。
- 返回
相关文档的列表。
- 返回类型
List[Document]
- invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document] ¶
调用检索器以获取相关文档。
同步检索器调用的主要入口点。
- 参数
input (str) – 查询字符串。
config (Optional[RunnableConfig]) – 检索器的配置。默认为 None。
kwargs (Any) – 传递给检索器的其他参数。
- 返回
相关文档的列表。
- 返回类型
List[Document]
示例
retriever.invoke("query")
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
stream的默认实现,调用invoke。如果子类支持流式输出,则应重写此方法。
- 参数
input – 可执行部分的输入。
config (可选) – 使用可执行部分时的配置。默认为None。
kwargs (可选[任意类型]) – 将传递给Runnable的额外关键字参数。
- 产生
可执行部分的输出。
- 返回类型
Output
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将Runnable序列化为JSON。
- 返回
Runnable的JSON序列化表示。
- 返回类型
- upsert(items: Sequence[Document], /, **kwargs: Any) UpsertResponse [source]¶
将条目插入索引。
- 参数
**items** (Sequence[Document]) –
kwargs (Any) –
- 返回类型