langchain_core.retrievers
.BaseRetriever¶
注意
BaseRetriever 实现了标准 Runnable 接口
. 🏃
Runnable 接口
包含了可用于可运行对象(Runnable)的额外方法,例如 with_types
,with_retry
,assign
,bind
,get_graph
等。
- class langchain_core.retrievers.BaseRetriever[source]¶
继承自:
RunnableSerializable
[str
,List
[Document
]],ABC
文档检索系统的抽象基类。
检索系统定义为可以接收字符串查询并从某个来源返回最“相关”的文档的工具。
用法
检索器遵循标准的 Runnable 接口,应通过 invoke,ainvoke,batch,abatch 等标准 Runnable 方法使用。
实现
在实现自定义检索器时,类应实现 _get_relevant_documents 方法以定义检索文档的逻辑。
可选地,可以通过重写 _aget_relevant_documents 方法提供异步本地实现。
示例:从文档列表中返回前 5 个文档的检索器
from langchain_core import Document, BaseRetriever from typing import List class SimpleRetriever(BaseRetriever): docs: List[Document] k: int = 5 def _get_relevant_documents(self, query: str) -> List[Document]: """Return the first k documents from the list of documents""" return self.docs[:self.k] async def _aget_relevant_documents(self, query: str) -> List[Document]: """(Optional) async native implementation.""" return self.docs[:self.k]
示例:基于 scikit-learn 向量化器的简单检索器
from sklearn.metrics.pairwise import cosine_similarity class TFIDFRetriever(BaseRetriever, BaseModel): vectorizer: Any docs: List[Document] tfidf_array: Any k: int = 4 class Config: arbitrary_types_allowed = True def _get_relevant_documents(self, query: str) -> List[Document]: # Ip -- (n_docs,x), Op -- (n_docs,n_Feats) query_vec = self.vectorizer.transform([query]) # Op -- (n_docs,1) -- Cosine Sim with each doc results = cosine_similarity(self.tfidf_array, query_vec).reshape((-1,)) return [self.docs[i] for i in results.argsort()[-self.k :][::-1]]
- param metadata: Optional[Dict[str, Any]] = None¶
与检索器相关的可选元数据。默认为 None。此元数据将与每次调用此检索器相关联,并作为参数传递给 callbacks 中定义的处理程序。您可以使用这些来标识一个特定检索器的用法。
- param tags: Optional[List[str]] = None¶
与检索器相关联的可选标签列表。默认为 None。这些标签将与每次调用此检索器相关联,并通过 callbacks 中定义的处理程序传递。您可以使用这些标签来识别检索器的特定实例及其使用情况。
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现通过 asyncio.gather 并行运行 ainvoke。
默认的批处理实现对受 IO 限制的运行程序工作很好。
子类如果可以更有效地批量操作,则应覆盖此方法;例如,如果底层的 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) – 要传递给 Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。配置支持如 'tags'、'metadata'(用于跟踪)、'max_concurrency'(用于控制并行执行的工作量)等标准键。请参阅 RunnableConfig 获取更多详细信息。默认为 None。
return_exceptions (bool) – 是否返回异常而不是抛出异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 返回
从 Runnable 返回的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]], *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]¶
并行运行一个可调用的输入列表,在它们完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用Runnable时所使用的配置。该配置支持标准键,如‘tags’、‘metadata’用于跟踪目的,‘max_concurrency’用于控制并行工作的量,以及其他键。请参阅RunnableConfig以获取更多详细信息。默认值为None。
return_exceptions (bool) – 是否返回异常而不是抛出异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
包含输入索引和Runnable输出或异常的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async aget_relevant_documents(query: str, *, callbacks: Callbacks =None, tags: Optional[List[str]] =None, metadata: Optional[Dict[str, Any] =None, run_name: Optional[str] =None, kwargs) → List[Document][source]¶
自版本 langchain-core==0.1.46 弃用: 用
ainvoke
代替。异步获取与查询相关的文档。
用户应优先使用 .ainvoke 或 .abatch 而不是直接使用 aget_relevant_documents。
- 参数
query (str) – 要寻找相关文档的字符串。
callbacks (Callbacks) – 回调管理器或回调列表。
tags (可选[List[str]]) – 可选的与检索器相关联的标签列表。这些标签将与每个调用检索器的方法关联,并通过 callbacks 中定义的处理程序传递。默认为 None。
metadata (可选[Dict[str, Any]]) – 可选的与检索器相关联的元数据。这些元数据将与每个调用检索器的方法关联,并通过 callbacks 中定义的处理程序传递。默认为 None。
run_name (可选[str]) – 可选的运行名称。默认为 None。
kwargs (任意) – 传递给检索器的其他参数。
- 返回
相关文档列表。
- 返回类型
List[Document]
- async ainvoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document] [源代码]¶
异步调用检索器以获取相关文档。
异步检索器调用的主入口点。
- 参数
input (str) – 查询字符串。
config (Optional[RunnableConfig]) – 检索器的配置。默认值为 None。
kwargs (任意) – 传递给检索器的其他参数。
- 返回
相关文档列表。
- 返回类型
List[Document]
示例
await retriever.ainvoke("query")
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
测试版
该API处于测试版,未来可能会发生变化。
从Runnable创建一个BaseTool。
as_tool
将从Runnable实例化一个包含名称、描述和args_schema
的BaseTool。尽可能的情况下,模式将从runnable.get_input_schema
推断。另请参见(例如,如果Runnable接收一个字典作为输入,并且具体的字典键没有类型),可以通过args_schema
直接指定模式。您还可以传递arg_types
仅指定所需的参数及其类型。- 参数
args_schema(《可选》)(《类型》<《BaseModel》>)- 工具的模式。默认为None。
name(《可选》)(《str》)- 工具的名称。默认为None。
description(《可选》)(《str》)- 工具的描述。默认为None。
arg_types(《可选》)(《字典》、(《str》、(《类型》)》))- 参数名称和类型的字典。默认为None。
- 返回
BaseTool实例。
- 返回类型
类型字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
新增于版本0.2.14。
- async astream(input: Input, config: Optional[RunnableConfig], **kwargs: Optional[Any]) AsyncIterator[Output]
默认的astream实现,调用ainvoke。子类应该覆盖此方法以支持流式输出。
- 参数
input(《输入》)- Runnable的输入。
config(《可选》)(《RunnableConfig》))- 用于Runnable的配置。默认为None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
测试版
该API处于测试版,未来可能会发生变化。
生成事件流。
用于创建StreamEvents的迭代器,它可以提供有关Runnable进度的实时信息,包括中间结果的StreamEvents。
StreamEvent是一个具有以下模式的字典
event
:《str》- 事件名称的格式为:on_[runnable_type]_(start|stream|end)。
name
:《str》- 生成事件的Runnable的名称。run_id
:《str》- 与给定执行相关的随机生成的ID发射事件的线程。作为父线程执行部分调用的子线程将被分配一个唯一的标识符。
parent_ids
:列表[str] - 生成事件的父线程的标识符。根线程将有一个空的列表。父ID的顺序是从根到直接父亲。仅适用于API的v2版本。API的v1版本将返回一个空列表。
tags
:可选[列表[str]] - 生成事件的线程上的标签。
metadata
:可选[Dict[str, Any]] - 生成事件的线程的元数据。
data
:Dict[str, Any]
以下是一个表格,说明各种链可能发射的一些事件。为了简洁,省略了元数据字段。链定义在表格之后。
请注意 此参考表是针对V2版本的模式。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[系统消息,人类消息]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(内容="hello")
on_chat_model_end
[模型名称]
{“messages”: [[系统消息,人类消息]]}
AIMessageChunk(内容="hello world")
on_llm_start
[模型名称]
{‘输入’:‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
格式文档
on_chain_stream
格式文档
“hello world!, goodbye world!”
on_chain_end
格式文档
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
某些工具
{“x”:1,“y”:”2”}
on_tool_end
某些工具
{“x”:1,“y”:”2”}
on_retriever_start
[检索器名称]
{“query”:”hello”}
on_retriever_end
[检索器名称]
{“query”:”hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{“问题”:”hello”}
on_prompt_end
[模板名称]
{“问题”:”hello”}
ChatPromptValue(messages:[SystemMessage, …])
除了标准事件外,用户还可以调度自定义事件(请参见下面的示例)。
自定义事件仅在API的v2版本中体现!
自定义事件具有以下格式
属性
类型
描述
名称
str
用户定义的事件名称。
数据
任何
与事件关联的数据。这可能是一切,但我们建议使其可序列化为JSON。
以下是与上述标准事件相关联的声明
格式文档:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
某些工具:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:调度自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
输入(任何) - Runnable的输入。
配置(可选[RunnableConfig]) - 要为Runnable使用的配置。
版本(文字['v1','v2']) - 要使用的模式版本,要么是v2,要么是v1。用户应使用v2。v1是为了向后兼容,将在0.4.0中弃用。在API稳定之前不会分配默认值。自定义事件仅在v2中体现。
包括名称(可选[序列[str]]) - 仅包括具有匹配名称的runnables的事件。
包括类型(可选[序列[str]]) - 仅包括具有匹配类型的runnables的事件。
包括标签(可选[序列[str]]) - 仅包括具有匹配标签的runnables的事件。
排除名称(可选[序列[str]]) - 排除与匹配名称的runnables的事件。
排除类型(可选[序列[str]]) - 排除与匹配类型的runnables的事件。
exclude_tags (可选[Sequence[str]]) – 从具有匹配标签的可执行程序中排除事件。
kwargs (任何类型) – 传递给可执行程序的额外关键字参数。这些将会传递给 astream_log,因为 astream_events 的实现建立在 astream_log 之上。
- 产生
异步流式传递 StreamEvents。
- 抛出
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]
默认实现使用线程池执行器并行运行 invoke。
默认的批处理实现对受 IO 限制的运行程序工作很好。
子类如果可以更有效地批量操作,则应覆盖此方法;例如,如果底层的 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) –
config (可选[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (可选[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行 invoke on 输入列表,以完成顺序返回结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (可选[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
配置可在运行时设置的 Runnables 的替代方案。
- 参数
**which**(《em class="sig-param">ConfigurableField》) – 用于选择替代方案的 ConfigurableField 实例。
**default_key**(《em class="sig-param">str》) – 如果未选择替代方案,则使用默认键。默认为“default”。
**prefix_keys**(《em class="sig-param">bool》) – 是否将键的前缀与 ConfigurableField id 相结合。默认为 False。
****kwargs**(《em class="sig-param">Union[Runnable[Input,Output],Callable[[],Runnable[Input,Output]]) – 键与 Runnable 实例或返回 Runnable 实例的可调用函数的字典。
- 返回
配置好替代方案的新 Runnable。
- 返回类型
《em class="sig-return-typehint">
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]¶
在运行时配置特定的Runnable字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。
- 返回
具有配置字段的新的 Runnable 对象。
- 返回类型
《em class="sig-return-typehint">
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- get_relevant_documents(query: str, *, callbacks: Callbacks = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, **kwargs: Any) List[Document] [source]¶
自langchain-core==0.1.46版本以来已弃用: 请使用
invoke
代替。检索与查询相关的文档。
用户应优先使用 .invoke 或 .batch 而不是直接使用 get_relevant_documents。
- 参数
query (str) – 要寻找相关文档的字符串。
callbacks (回调) - 回调管理器或回调列表。默认值为 None。
tags (可选[List[str]]) – 可选的与检索器相关联的标签列表。这些标签将与每个调用检索器的方法关联,并通过 callbacks 中定义的处理程序传递。默认为 None。
metadata (可选[Dict[str, Any]]) – 可选的与检索器相关联的元数据。这些元数据将与每个调用检索器的方法关联,并通过 callbacks 中定义的处理程序传递。默认为 None。
run_name (可选[str]) – 可选的运行名称。默认为 None。
kwargs (任意) – 传递给检索器的其他参数。
- 返回
相关文档列表。
- 返回类型
List[Document]
- invoke(input: str, config: Optional[RunnableConfig] = None, **kwargs: Any) List[Document] [source]¶
调用检索器以获取相关文档。
同步检索器调用的主入口点。
- 参数
input (str) – 查询字符串。
config (Optional[RunnableConfig]) – 检索器的配置。默认值为 None。
kwargs (任意) – 传递给检索器的其他参数。
- 返回
相关文档列表。
- 返回类型
List[Document]
示例
retriever.invoke("query")
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]¶
stream的默认实现,它调用invoke。如果子类支持流式输出,则应重写此方法。
- 参数
input(《输入》)- Runnable的输入。
config(《可选》)(《RunnableConfig》))- 用于Runnable的配置。默认为None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable的输出。
- 返回类型
Iterator[Output]
- to_json() Union[SerializedConstructor, SerializedNotImplemented]¶
将Runnable序列化为JSON。
- 返回
Runnable的JSON序列化表示。
- 返回类型