langchain.retrievers.parent_document_retriever
.ParentDocumentRetriever¶
Note
ParentDocumentRetriever implements the standard Runnable Interface
. 🏃
The Runnable Interface
has additional methods that are available on runnables, such as with_types
, with_retry
, assign
, bind
, get_graph
, and more.
- class langchain.retrievers.parent_document_retriever.ParentDocumentRetriever[source]¶
Bases:
MultiVectorRetriever
Retrieve small chunks then retrieve their parent documents.
When splitting documents for retrieval, there are often conflicting desires
- You may want to have small documents, so that their embeddings can most
accurately reflect their meaning. If too long, then the embeddings can lose meaning.
- You want to have long enough documents that the context of each chunk is
retained.
The ParentDocumentRetriever strikes that balance by splitting and storing small chunks of data. During retrieval, it first fetches the small chunks but then looks up the parent ids for those chunks and returns those larger documents.
Note that “parent document” refers to the document that a small chunk originated from. This can either be the whole raw document OR a larger chunk.
Examples
from langchain_chroma import Chroma from langchain_community.embeddings import OpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain.storage import InMemoryStore # This text splitter is used to create the parent documents parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, add_start_index=True) # This text splitter is used to create the child documents # It should create documents smaller than the parent child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, add_start_index=True) # The vectorstore to use to index the child chunks vectorstore = Chroma(embedding_function=OpenAIEmbeddings()) # The storage layer for the parent documents store = InMemoryStore() # Initialize the retriever retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=store, child_splitter=child_splitter, parent_splitter=parent_splitter, )
- param byte_store: Optional[ByteStore] = None¶
The lower-level backing storage layer for the parent documents
- param child_metadata_fields: Optional[Sequence[str]] = None¶
Metadata fields to leave in child documents. If None, leave all parent document metadata.
- param child_splitter: TextSplitter [Required]¶
The text splitter to use to create child documents.
- param id_key: str = 'doc_id'¶
- param metadata: Optional[Dict] = None¶
Optional metadata associated with the retriever. Defaults to None. This metadata will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks. You can use these to eg identify a specific instance of a retriever with its use case.
- param parent_splitter: Optional[TextSplitter] = None¶
The text splitter to use to create parent documents. If none, then the parent documents will be the raw documents passed in.
- param search_kwargs: dict [Optional]¶
Keyword arguments to pass to the search function.
- param search_type: SearchType = SearchType.similarity¶
Type of search to perform (similarity / mmr)
- param tags: Optional[List[str]] = None¶
Optional list of tags associated with the retriever. Defaults to None. These tags will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks. You can use these to eg identify a specific instance of a retriever with its use case.
- param vectorstore: VectorStore [Required]¶
The underlying vectorstore to use to store small chunks and their embedding vectors
- async aadd_documents(documents: List[Document], ids: Optional[List[str]] = None, add_to_docstore: bool = True, **kwargs: Any) None [source]¶
- Parameters
documents (List[Document]) –
ids (Optional[List[str]]) –
add_to_docstore (bool) –
kwargs (Any) –
- Return type
None
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
Default implementation runs ainvoke in parallel using asyncio.gather.
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.
- Parameters
inputs (List[Input]) – A list of inputs to the Runnable.
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – A config to use when invoking the Runnable. The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details. Defaults to None.
return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
kwargs (Optional[Any]) – Additional keyword arguments to pass to the Runnable.
- Returns
A list of outputs from the Runnable.
- Return type
List[Output]
- async abatch_as_completed(inputs: Sequence">[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
Run ainvoke in parallel on a list of inputs, yielding results as they complete.
- Parameters
inputs (Sequence[Input]) – A list of inputs to the Runnable.
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – A config to use when invoking the Runnable. The config supports standard keys like ‘tags’, ‘metadata’ for tracing purposes, ‘max_concurrency’ for controlling how much work to do in parallel, and other keys. Please refer to the RunnableConfig for more details. Defaults to None. Defaults to None.
return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
kwargs (Optional[Any]) – Additional keyword arguments to pass to the Runnable.
- Yields
A tuple of the index of the input and the output from the Runnable.
- Return type
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- add_documents(documents: List[Document], ids: Optional[List[str]] = None, add_to_docstore: bool = True, **kwargs: Any) None [source]¶
Adds documents to the docstore and vectorstores.
- Parameters
documents (List[Document]) – List of documents to add
ids (Optional[List[str]]) – Optional list of ids for documents. If provided should be the same length as the list of documents. Can be provided if parent documents are already in the document store and you don’t want to re-add to the docstore. If not provided, random UUIDs will be used as ids.
add_to_docstore (bool) – Boolean of whether to add documents to docstore. This can be false if and only if ids are provided. You may want to set this to False if the documents are already in the docstore and you don’t want to re-add them.
kwargs (Any) –
- Return type
None
- async aget_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document] ¶
Deprecated since version langchain-core==0.1.46: Use
ainvoke
instead.Asynchronously get documents relevant to a query.
Users should favor using .ainvoke or .abatch rather than aget_relevant_documents directly.
- Parameters
query (str) – string to find relevant documents for.
callbacks (Callbacks) – Callback manager or list of callbacks.
tags (Optional[List[str]]) – Optional list of tags associated with the retriever. These tags will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks. Defaults to None.
metadata (Optional[Dict[str, Any]]) – Optional metadata associated with the retriever. This metadata will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks. Defaults to None.
run_name (Optional[str]) – Optional name for the run. Defaults to None.
kwargs (Any) – Additional arguments to pass to the retriever.
- Returns
List of relevant documents.
- Return type
List[Document]
- async ainvoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document] ¶
Asynchronously invoke the retriever to get relevant documents.
Main entry point for asynchronous retriever invocations.
- Parameters
input (str) – The query string.
config (Optional[RunnableConfig]) – Configuration for the retriever. Defaults to None.
kwargs (Any) – Additional arguments to pass to the retriever.
- Returns
List of relevant documents.
- Return type
List[Document]
Examples
await retriever.ainvoke("query")
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
This API is in beta and may change in the future.
Create a BaseTool from a Runnable.
as_tool
将会从 Runnable 实例化一个带有名称、描述和args_schema
的 BaseTool。如果可能,schema 会从runnable.get_input_schema
中推断。或者(例如,如果 Runnable 接受一个 dict 作为输入,并且特定的 dict 键没有类型),schema 可以直接用args_schema
指定。你也可以传递arg_types
来仅指定必需的参数及其类型。- Parameters
args_schema (Optional[Type[BaseModel]]) – 工具的 schema。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- Returns
一个 BaseTool 实例。
- Return type
Typed dict input (类型化字典输入)
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input (字典输入), 通过args_schema
指定 schemafrom typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
input (字典输入), 通过arg_types
指定 schemafrom typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
String input (字符串输入)
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
New in version 0.2.14. (0.2.14 版本新增特性)
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
`astream` 的默认实现,它调用 `ainvoke`。如果子类支持流式输出,则应覆盖此方法。
- Parameters
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – Additional keyword arguments to pass to the Runnable.
- Yields
Runnable 的输出。
- Return type
AsyncIterator[Output] (异步迭代器 [输出])
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
This API is in beta and may change in the future.
生成一个事件流。
用于创建一个 StreamEvents 的迭代器,该迭代器提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个具有以下 schema 的字典
event
: str - 事件名称的格式为:format: on_[runnable_type]_(start|stream|end). (格式:on_[runnable_type]_(start|stream|end).)
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 随机生成的 ID,与给定 Runnable 执行的事件相关联。作为父 Runnable 执行的一部分而被调用的子 Runnable 将被分配其自己唯一的 ID。
parent_ids
: List[str] - 生成事件的父 runnables 的 ID 列表。根 Runnable 将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。事件。
metadata
: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。事件。
data
: Dict[str, Any]
下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链定义已包含在表格之后。
ATTENTION (注意) 此参考表适用于 schema 的 V2 版本。
event (事件)
name (名称)
chunk (块)
input (输入)
output (输出)
on_chat_model_start
[model name] (模型名称)
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name] (模型名称)
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name] (模型名称)
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name] (模型名称)
{‘input’: ‘hello’}
on_llm_stream
[model name] (模型名称)
‘Hello’
on_llm_end
[model name] (模型名称)
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name] (检索器名称)
{“query”: “hello”}
on_retriever_end
[retriever name] (检索器名称)
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name] (模板名称)
{“question”: “hello”}
on_prompt_end
[template_name] (模板名称)
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以调度自定义事件(见下面的示例)。
自定义事件将仅在 API 的 v2 版本中显示!
自定义事件具有以下格式
Attribute (属性)
Type (类型)
Description (描述)
name (名称)
str
用户定义的事件名称。
data
Any (任意类型)
与事件关联的数据。这可以是任何内容,但我们建议使其 JSON 可序列化。
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt (提示):
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
Example (示例)
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
Example: Dispatch Custom Event (示例:调度自定义事件)
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的 schema 版本,可以是 v2 或 v1。用户应使用 v2。v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包含来自具有匹配名称的 runnables 的事件。
include_types (Optional[Sequence[str]]) – 仅包含来自具有匹配类型的 runnables 的事件。
include_tags (Optional[Sequence[str]]) – 仅包含来自具有匹配标签的 runnables 的事件。
exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。
exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。
exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为 astream_events 的此实现构建在 astream_log 之上。
- Yields
StreamEvents 的异步流。
- Raises (异常)
NotImplementedError – 如果版本不是 v1 或 v2。
- Return type
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] (异步迭代器 [StandardStreamEvent 或 CustomStreamEvent 的联合类型])
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
The default implementation of batch works well for IO bound runnables.
Subclasses should override this method if they can batch more efficiently; e.g., if the underlying Runnable uses an API which supports a batch mode.
- Parameters
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- Return type
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行 invoke 处理输入列表,并在完成时产生结果。
- Parameters
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- Return type
Iterator[Tuple[int, Union[Output, Exception]]] (迭代器 [元组[整数, 输出或异常的联合类型]])
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
配置可在运行时设置的 Runnables 的备选项。
- Parameters
which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。
default_key (str) – 如果未选择备选项,则使用的默认键。默认为 “default”。
prefix_keys (bool) – 是否用 ConfigurableField id 作为键的前缀。默认为 False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。
- Returns
配置了备选项的新 Runnable。
- Return type
RunnableSerializable[Input, Output] (RunnableSerializable [输入, 输出])
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output] ¶
在运行时配置特定的 Runnable 字段。
- Parameters
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。
- Returns
配置了字段的新 Runnable。
- Return type
RunnableSerializable[Input, Output] (RunnableSerializable [输入, 输出])
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document] ¶
Deprecated since version langchain-core==0.1.46: Use
invoke
instead. (从 langchain-core==0.1.46 版本开始弃用:请使用invoke
代替。)检索与查询相关的文档。
用户应优先使用 .invoke 或 .batch 而不是直接使用 get_relevant_documents。
- Parameters
query (str) – string to find relevant documents for.
callbacks (Callbacks) – 回调管理器或回调列表。默认为 None。
tags (Optional[List[str]]) – Optional list of tags associated with the retriever. These tags will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks. Defaults to None.
metadata (Optional[Dict[str, Any]]) – Optional metadata associated with the retriever. This metadata will be associated with each call to this retriever, and passed as arguments to the handlers defined in callbacks. Defaults to None.
run_name (Optional[str]) – Optional name for the run. Defaults to None.
kwargs (Any) – Additional arguments to pass to the retriever.
- Returns
List of relevant documents.
- Return type
List[Document]
- invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List">[Document] ¶
调用检索器以获取相关文档。
同步检索器调用的主要入口点。
- Parameters
input (str) – The query string.
config (Optional[RunnableConfig]) – Configuration for the retriever. Defaults to None.
kwargs (Any) – Additional arguments to pass to the retriever.
- Returns
List of relevant documents.
- Return type
List[Document]
Examples
retriever.invoke("query")
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。
- Parameters
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – Additional keyword arguments to pass to the Runnable.
- Yields
Runnable 的输出。
- Return type
Iterator[Output] (迭代器 [输出])
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将 Runnable 序列化为 JSON。
- Returns
Runnable 的 JSON 可序列化表示。
- Return type