langchain_community.vectorstores.vectara
.VectaraRAG¶
注意
VectaraRAG 实现了标准的 Runnable Interface
。 🏃
Runnable Interface
还有其他可在 Runnables 上使用的方法,例如 with_types
、with_retry
、assign
、bind
、get_graph
等。
- class langchain_community.vectorstores.vectara.VectaraRAG(vectara: Vectara, config: VectaraQueryConfig, chat: bool = False)[source]¶
Vectara RAG 可运行对象。
- 参数
vectara (Vectara) – Vectara 对象
config (VectaraQueryConfig) – VectaraQueryConfig 对象
chat (bool) – bool, 默认为 False
属性
InputType
此 Runnable 接受的输入类型,指定为类型注解。
OutputType
此 Runnable 生成的输出类型,指定为类型注解。
config_specs
此 Runnable 的可配置字段列表。
input_schema
此 Runnable 接受的输入类型,指定为 pydantic 模型。
name
Runnable 的名称。
output_schema
此 Runnable 生成的输出类型,指定为 pydantic 模型。
方法
__init__
(vectara, config[, chat])abatch
(inputs[, config, return_exceptions])默认实现使用 asyncio.gather 并行运行 ainvoke。
abatch_as_completed
(inputs[, config, ...])在一系列输入上并行运行 ainvoke,并在完成时产生结果。
ainvoke
(input[, config])ainvoke 的默认实现,从线程调用 invoke。
as_tool
([args_schema, name, description, ...])assign
(**kwargs)为此 Runnable 的字典输出分配新字段。
astream
(input[, config])astream 的默认实现,它调用 ainvoke。
astream_events
(input[, config, ...])astream_log
(input[, config, diff, ...])从 Runnable 流式传输所有输出,如回调系统报告的那样。
atransform
(input[, config])atransform 的默认实现,它缓冲输入并调用 astream。
batch
(inputs[, config, return_exceptions])默认实现使用线程池执行器并行运行 invoke。
batch_as_completed
(inputs[, config, ...])在一系列输入上并行运行 invoke,并在完成时产生结果。
bind
(**kwargs)将参数绑定到 Runnable,返回一个新的 Runnable。
config_schema
(*[, include])此 Runnable 接受的配置类型,指定为 pydantic 模型。
get_graph
([config])返回此 Runnable 的图形表示。
get_input_schema
([config])获取可用于验证 Runnable 输入的 pydantic 模型。
get_name
([suffix, name])获取 Runnable 的名称。
get_output_schema
([config])获取可用于验证 Runnable 输出的 pydantic 模型。
get_prompts
([config])返回此 Runnable 使用的提示列表。
invoke
(input[, config])将单个输入转换为输出。
map
()返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。
pick
(keys)从此 Runnable 的字典输出中选择键。
pipe
(*others[, name])将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
stream
(input[, config])从 Vectara RAG 获取流式输出。
transform
(input[, config])transform 的默认实现,它缓冲输入,然后调用 stream。
with_alisteners
(*[, on_start, on_end, on_error])将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_config
([config])将配置绑定到 Runnable,返回一个新的 Runnable。
with_fallbacks
(fallbacks, *[, ...])向 Runnable 添加回退,返回一个新的 Runnable。
with_listeners
(*[, on_start, on_end, on_error])将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
with_retry
(*[, retry_if_exception_type, ...])创建一个新的 Runnable,它在异常时重试原始 Runnable。
with_types
(*[, input_type, output_type])将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- __init__(vectara: Vectara, config: VectaraQueryConfig, chat: bool = False)[source]¶
- 参数
vectara (Vectara) –
config (VectaraQueryConfig) –
chat (bool) –
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现非常适用于 IO 绑定的 runnables。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 返回
来自 Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
在一系列输入上并行运行 ainvoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的“tags”、“metadata”,用于控制并行执行多少工作的“max_concurrency”以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。 默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
输入索引和来自 Runnable 的输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output ¶
ainvoke 的默认实现,从线程调用 invoke。
即使 Runnable 没有实现 invoke 的原生异步版本,默认实现也允许使用异步代码。
如果子类可以异步运行,则应覆盖此方法。
- 参数
input (Input) –
config (Optional[RunnableConfig]) –
kwargs (Any) –
- 返回类型
输出
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 beta 阶段,将来可能会更改。
从 Runnable 创建一个 BaseTool。
as_tool
将从 Runnable 实例化一个具有名称、描述和args_schema
的 BaseTool。在可能的情况下,模式是从runnable.get_input_schema
推断出来的。或者(例如,如果 Runnable 接受 dict 作为输入,并且未键入特定的 dict 键),可以使用args_schema
直接指定模式。您也可以传递arg_types
以仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (可选[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回
一个 BaseTool 实例。
- 返回类型
类型字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
版本 0.2.14 新增。
- assign(**kwargs: Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) RunnableSerializable[Any, Any] ¶
将新字段分配给此 Runnable 的字典输出。返回一个新的 Runnable。
from langchain_community.llms.fake import FakeStreamingListLLM from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import SystemMessagePromptTemplate from langchain_core.runnables import Runnable from operator import itemgetter prompt = ( SystemMessagePromptTemplate.from_template("You are a nice assistant.") + "{question}" ) llm = FakeStreamingListLLM(responses=["foo-lish"]) chain: Runnable = prompt | llm | {"str": StrOutputParser()} chain_with_assign = chain.assign(hello=itemgetter("str") | llm) print(chain_with_assign.input_schema.schema()) # {'title': 'PromptInput', 'type': 'object', 'properties': {'question': {'title': 'Question', 'type': 'string'}}} print(chain_with_assign.output_schema.schema()) # {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties': {'str': {'title': 'Str', 'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
- 参数
kwargs (Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any], Mapping[str, Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]]]]) –
- 返回类型
RunnableSerializable[Any, Any]
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator">[Output] ¶
astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 beta 阶段,将来可能会更改。
生成事件流。
用于创建一个迭代器,遍历 StreamEvents,这些事件提供关于 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个具有以下模式的字典
event
: str - 事件名称的格式为:format: on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 随机生成的 ID,与给定 Runnable 执行相关联,该 Runnable 发出事件。作为父 Runnable 执行一部分调用的子 Runnable 会被分配自己的唯一 ID。the Runnable that emitted the event. A child Runnable that gets invoked as part of the execution of a parent Runnable is assigned its own unique ID.
parent_ids
: List[str] - 生成事件的父 runnables 的 ID 列表。根 Runnable 将具有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。generated the event. The root Runnable will have an empty list. The order of the parent IDs is from the root to the immediate parent. Only available for v2 version of the API. The v1 version of the API will return an empty list.
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。the event.
metadata
: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据。that generated the event.
data
: Dict[str, Any]
下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,元数据字段已从表格中省略。链定义已包含在表格之后。
注意 此参考表适用于模式的 V2 版本。
事件
name
块
输入
输出
on_chat_model_start
[模型名称]
{"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content=”hello”)
on_chat_model_end
[模型名称]
{"messages": [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{"x": 1, “y”: “2”}
on_tool_end
some_tool
{"x": 1, “y”: “2”}
on_retriever_start
[检索器名称]
{"query": “hello”}
on_retriever_end
[检索器名称]
{"query": “hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{"question": “hello”}
on_prompt_end
[模板名称]
{"question": “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件之外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中显示!
自定义事件具有以下格式
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件关联的数据。这可以是任何内容,但我们建议使其 JSON 可序列化。
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
prompt:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分派自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的模式版本,可以是 v2 或 v1。用户应使用 v2。v1 用于向后兼容,将在 0.4.0 版本中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnables 的事件。
include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnables 的事件。
include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnables 的事件。
exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。
exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。
exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些参数将传递给 astream_log,因为此 astream_events 的实现是基于 astream_log 构建的。
- 产生
StreamEvents 的异步流。
- Raises
NotImplementedError – 如果版本不是 v1 或 v2,则引发此错误。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- async astream_log(input: Any, config: Optional[RunnableConfig] = None, *, diff: bool = True, with_streamed_output_list: bool = True, include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]] ¶
流式传输来自 Runnable 的所有输出,如回调系统报告的那样。这包括 LLM、Retriever、Tool 等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括 Jsonpatch ops 列表,这些操作描述了运行状态在每个步骤中如何更改,以及运行的最终状态。
Jsonpatch ops 可以按顺序应用以构造状态。
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
diff (bool) – 是否产生每个步骤之间的差异或当前状态。
with_streamed_output_list (bool) – 是否产生 streamed_output 列表。
include_names (Optional[Sequence[str]]) – 仅包括具有这些名称的日志。
include_types (Optional[Sequence[str]]) – 仅包括具有这些类型的日志。
include_tags (Optional[Sequence[str]]) – 仅包括具有这些标签的日志。
exclude_names (Optional[Sequence[str]]) – 排除具有这些名称的日志。
exclude_types (Optional[Sequence[str]]) – 排除具有这些类型的日志。
exclude_tags (Optional[Sequence[str]]) – 排除具有这些标签的日志。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 产生
一个 RunLogPatch 或 RunLog 对象。
- 返回类型
Union[AsyncIterator[RunLogPatch], AsyncIterator[RunLog]]
- async atransform(input: AsyncIterator[Input], config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
atransform 的默认实现,它缓冲输入并调用 astream。如果子类可以在输入仍在生成时开始生成输出,则应重写此方法。
- 参数
input (AsyncIterator[Input]) – Runnable 的输入异步迭代器。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
batch 的默认实现非常适用于 IO 绑定的 runnables。
如果子类可以更有效地进行批量处理,则应覆盖此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
在一系列输入上并行运行 invoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- bind(**kwargs: Any) Runnable[Input, Output] ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个参数,而该参数不在前一个 Runnable 的输出中,也不包含在用户输入中时,此方法很有用。
- 参数
kwargs (Any) – 要绑定到 Runnable 的参数。
- 返回
一个新的 Runnable,参数已绑定。
- 返回类型
Runnable[Input, Output]
示例
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- config_schema(*, include: Optional[Sequence[str]] = None) Type:[BaseModel] ¶
此 Runnable 接受的配置类型,指定为 pydantic 模型。
要将字段标记为可配置,请参阅 configurable_fields 和 configurable_alternatives 方法。
- 参数
include (Optional[Sequence[str]]) – 要包含在配置模式中的字段列表。
- 返回
一个 pydantic 模型,可用于验证配置。
- 返回类型
Type[BaseModel]
- get_graph(config: Optional:[RunnableConfig] = None) Graph ¶
返回此 Runnable 的图形表示。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
- get_input_schema(config: Optional:[RunnableConfig] = None) Type:[BaseModel] ¶
获取可用于验证 Runnable 输入的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
- 参数
config (Optional[RunnableConfig]) – 生成模式时要使用的配置。
- 返回
一个 pydantic 模型,可用于验证输入。
- 返回类型
Type[BaseModel]
- get_name(suffix: Optional:[str] = None, *, name: Optional:[str] = None) str ¶
获取 Runnable 的名称。
- 参数
suffix (Optional[str]) –
name (Optional[str]) –
- 返回类型
str
- get_output_schema(config: Optional:[RunnableConfig] = None) Type:[BaseModel] ¶
获取可用于验证 Runnable 输出的 pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnables 将具有动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
- 参数
config (Optional[RunnableConfig]) – 生成模式时要使用的配置。
- 返回
一个 pydantic 模型,可用于验证输出。
- 返回类型
Type[BaseModel]
- get_prompts(config: Optional:[RunnableConfig] = None) List:[BasePromptTemplate] ¶
返回此 Runnable 使用的提示列表。
- 参数
config (Optional[RunnableConfig]) –
- 返回类型
List[BasePromptTemplate]
- invoke(input: str, config: Optional:[RunnableConfig] = None) dict [source]¶
将单个输入转换为输出。覆盖以实现。
- 参数
input (str) – Runnable 的输入。
config (Optional[RunnableConfig]) – 调用 Runnable 时要使用的配置。该配置支持标准键,如用于跟踪目的的 'tags'、'metadata',用于控制并行执行量的 'max_concurrency' 以及其他键。请参阅 RunnableConfig 以获取更多详细信息。
- 返回
Runnable 的输出。
- 返回类型
dict
- map() Runnable:[List:[Input], List:[Output]] ¶
返回一个新的 Runnable,它通过对每个输入调用 invoke(),将输入列表映射到输出列表。
- 返回
一个新的 Runnable,将输入列表映射到输出列表。
- 返回类型
Runnable[List[Input], List[Output]]
示例
from langchain_core.runnables import RunnableLambda def _lambda(x: int) -> int: return x + 1 runnable = RunnableLambda(_lambda) print(runnable.map().invoke([1, 2, 3])) # [2, 3, 4]
- pick(keys: Union:[str, List:[str]]) RunnableSerializable:[Any, Any] ¶
从此 Runnable 的字典输出中选择键。
- 选择单个键
import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) chain = RunnableMap(str=as_str, json=as_json) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3]} json_only_chain = chain.pick("json") json_only_chain.invoke("[1, 2, 3]") # -> [1, 2, 3]
- 选择键列表
from typing import Any import json from langchain_core.runnables import RunnableLambda, RunnableMap as_str = RunnableLambda(str) as_json = RunnableLambda(json.loads) def as_bytes(x: Any) -> bytes: return bytes(x, "utf-8") chain = RunnableMap( str=as_str, json=as_json, bytes=RunnableLambda(as_bytes) ) chain.invoke("[1, 2, 3]") # -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"} json_and_bytes_chain = chain.pick(["json", "bytes"]) json_and_bytes_chain.invoke("[1, 2, 3]") # -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
- 参数
keys (Union[str, List[str]]) –
- 返回类型
RunnableSerializable[Any, Any]
- pipe(*others: Union:[Runnable:[Any, Other], Callable:[:[Any], Other]], name: Optional:[str] = None) RunnableSerializable:[Input, Other] ¶
将此 Runnable 与类似 Runnable 的对象组合以创建 RunnableSequence。
等效于 RunnableSequence(self, *others) 或 self | others[0] | …
示例
from langchain_core.runnables import RunnableLambda def add_one(x: int) -> int: return x + 1 def mul_two(x: int) -> int: return x * 2 runnable_1 = RunnableLambda(add_one) runnable_2 = RunnableLambda(mul_two) sequence = runnable_1.pipe(runnable_2) # Or equivalently: # sequence = runnable_1 | runnable_2 # sequence = RunnableSequence(first=runnable_1, last=runnable_2) sequence.invoke(1) await sequence.ainvoke(1) # -> 4 sequence.batch([1, 2, 3]) await sequence.abatch([1, 2, 3]) # -> [4, 6, 8]
- 参数
others (Union[Runnable[Any, Other], Callable[[Any], Other]]) –
name (Optional[str]) –
- 返回类型
RunnableSerializable[Input, Other]
- stream(input: str, config: Optional:[RunnableConfig] = None, **kwargs: Any) Iterator:[dict] [source]¶
从 Vectara RAG 获取流式输出。
- 参数
input (str) – 输入查询
config (Optional[RunnableConfig]) – RunnableConfig 对象
kwargs (Any) – 任何其他参数
- 返回
包含问题、答案和上下文的输出字典
- 返回类型
Iterator[dict]
- transform(input: Iterator:[Input], config: Optional:[RunnableConfig] = None, **kwargs: Optional:[Any]) Iterator:[Output] ¶
transform 的默认实现,它缓冲输入,然后调用 stream。如果子类可以在仍在生成输入时开始生成输出,则应覆盖此方法。
- 参数
input (Iterator[Input]) – Runnable 的输入迭代器。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 要传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- with_alisteners(*, on_start: Optional:[AsyncListener] = None, on_end: Optional:[AsyncListener] = None, on_error: Optional:[AsyncListener] = None) Runnable:[Input, Output] ¶
将异步生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start: 在 Runnable 开始运行之前异步调用。 on_end: 在 Runnable 完成运行后异步调用。 on_error: 如果 Runnable 抛出错误,则异步调用。
Run 对象包含有关运行的信息,包括其 ID、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。
- 参数
on_start (Optional[AsyncListener]) – 在 Runnable 开始运行之前异步调用。默认为 None。
on_end (Optional[AsyncListener]) – 在 Runnable 完成运行后异步调用。默认为 None。
on_error (Optional[AsyncListener]) – 如果 Runnable 抛出错误,则异步调用。默认为 None。
- 返回
一个新的 Runnable,已绑定监听器。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: Optional:[RunnableConfig] = None, **kwargs: Any) Runnable:[Input, Output] ¶
将配置绑定到 Runnable,返回一个新的 Runnable。
- 参数
config (Optional[RunnableConfig]) – 要绑定到 Runnable 的配置。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。
- 返回
一个新的 Runnable,已绑定配置。
- 返回类型
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: Tuple[Type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT:[Input, Output] ¶
向 Runnable 添加回退,返回一个新的 Runnable。
新的 Runnable 将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试的一系列 runnables。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,键为指定的键。如果为 None,则异常不会传递给回退。如果使用,则基本 Runnable 及其回退必须接受字典作为输入。默认为 None。
- 返回
一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。
- 返回类型
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- 参数
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,则尝试的一系列 runnables。
exceptions_to_handle (Tuple[Type[BaseException], ...]) – 要处理的异常类型元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给回退,键为指定的键。如果为 None,则异常不会传递给回退。如果使用,则基本 Runnable 及其回退必须接受字典作为输入。
- 返回
一个新的 Runnable,它将尝试原始 Runnable,然后在失败时按顺序尝试每个回退。
- 返回类型
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Optional:[Union:[Callable:[:[Run], None], Callable:[:[Run, RunnableConfig], None]]] = None, on_end: Optional:[Union:[Callable:[:[Run], None], Callable:[:[Run, RunnableConfig], None]]] = None, on_error: Optional:[Union:[Callable:[:[Run], None], Callable:[:[Run, RunnableConfig], None]]] = None) Runnable:[Input, Output] ¶
将生命周期监听器绑定到 Runnable,返回一个新的 Runnable。
on_start: 在 Runnable 开始运行之前调用,使用 Run 对象。 on_end: 在 Runnable 完成运行后调用,使用 Run 对象。 on_error: 如果 Runnable 抛出错误,则调用,使用 Run 对象。
Run 对象包含有关运行的信息,包括其 ID、类型、输入、输出、错误、开始时间、结束时间以及添加到运行的任何标签或元数据。
- 参数
on_start (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 开始运行之前调用。默认为 None。
on_end (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 在 Runnable 完成运行后调用。默认为 None。
on_error (Optional[Union[Callable[[Run], None], Callable[[Run, RunnableConfig], None]]]) – 如果 Runnable 抛出错误,则调用。默认为 None。
- 返回
一个新的 Runnable,已绑定监听器。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: ~typing.Tuple[~typing.Type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable:[Input, Output] ¶
创建一个新的 Runnable,它在异常时重试原始 Runnable。
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 要重试的异常类型元组。默认为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为 3。
- 返回
一个新的 Runnable,在发生异常时重试原始的 Runnable。
- 返回类型
Runnable[Input, Output]
示例
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- 参数
retry_if_exception_type (Tuple[Type[BaseException], ...]) – 一个异常类型元组,指定在哪些异常类型上进行重试
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- 返回
一个新的 Runnable,在发生异常时重试原始的 Runnable。
- 返回类型
Runnable[Input, Output]
- with_types(*, input_type: Optional[Type[Input]] = None, output_type: Optional[Type[Output]] = None) Runnable[Input, Output] ¶
将输入和输出类型绑定到 Runnable,返回一个新的 Runnable。
- 参数
input_type (Optional[Type[Input]]) – 要绑定到 Runnable 的输入类型。默认为 None。
output_type (Optional[Type[Output]]) – 要绑定到 Runnable 的输出类型。默认为 None。
- 返回
一个新的 Runnable,类型已绑定。
- 返回类型
Runnable[Input, Output]