langchain.agents.openai_assistant.base
.OpenAIAssistantRunnable¶
注意
OpenAIAssistantRunnable 实现了标准的 Runnable 接口
。 🏃
Runnable 接口
还有其他可在 runnable 上使用的方法,例如 with_types
、 with_retry
、 assign
、 bind
、 get_graph
等。
- class langchain.agents.openai_assistant.base.OpenAIAssistantRunnable[source]¶
基类:
RunnableSerializable
[Dict
,Union
[List
[OpenAIAssistantAction
],OpenAIAssistantFinish
,List
[ThreadMessage
],List
[RequiredActionFunctionToolCall
]]]运行 OpenAI Assistant。
- 使用 OpenAI 工具的示例
from langchain_experimental.openai_assistant import OpenAIAssistantRunnable interpreter_assistant = OpenAIAssistantRunnable.create_assistant( name="langchain assistant", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=[{"type": "code_interpreter"}], model="gpt-4-1106-preview" ) output = interpreter_assistant.invoke({"content": "What's 10 - 4 raised to the 2.7"})
- 使用自定义工具和 AgentExecutor 的示例
from langchain_experimental.openai_assistant import OpenAIAssistantRunnable from langchain.agents import AgentExecutor from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantRunnable.create_assistant( name="langchain assistant e2b tool", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=tools, model="gpt-4-1106-preview", as_agent=True ) agent_executor = AgentExecutor(agent=agent, tools=tools) agent_executor.invoke({"content": "What's 10 - 4 raised to the 2.7"})
- 使用自定义工具和自定义执行的示例
from langchain_experimental.openai_assistant import OpenAIAssistantRunnable from langchain.agents import AgentExecutor from langchain_core.agents import AgentFinish from langchain.tools import E2BDataAnalysisTool tools = [E2BDataAnalysisTool(api_key="...")] agent = OpenAIAssistantRunnable.create_assistant( name="langchain assistant e2b tool", instructions="You are a personal math tutor. Write and run code to answer math questions.", tools=tools, model="gpt-4-1106-preview", as_agent=True ) def execute_agent(agent, tools, input): tool_map = {tool.name: tool for tool in tools} response = agent.invoke(input) while not isinstance(response, AgentFinish): tool_outputs = [] for action in response: tool_output = tool_map[action.tool].invoke(action.tool_input) tool_outputs.append({"output": tool_output, "tool_call_id": action.tool_call_id}) response = agent.invoke( { "tool_outputs": tool_outputs, "run_id": action.run_id, "thread_id": action.thread_id } ) return response response = execute_agent(agent, tools, {"content": "What's 10 - 4 raised to the 2.7"}) next_response = execute_agent(agent, tools, {"content": "now add 17.241", "thread_id": response.thread_id})
- param as_agent: bool = False¶
用作 LangChain agent,与 AgentExecutor 兼容。
- param assistant_id: str [必需]¶
OpenAI assistant ID。
- param async_client: Any = None¶
OpenAI 或 AzureOpenAI 异步客户端。
- param check_every_ms: float = 1000.0¶
以毫秒为单位检查运行进度的频率。
- param client: Any [可选]¶
OpenAI 或 AzureOpenAI 客户端。
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现适用于 IO 绑定的 runnables。
如果子类可以更高效地进行批量处理,则应重写此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行工作量的 ‘max_concurrency’,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 返回值
Runnable 的输出列表。
- 返回类型
List[Output]
- async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行输入列表上的 ainvoke,并在完成时产生结果。
- 参数
inputs (Sequence[Input]) – Runnable 的输入列表。
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用 Runnable 时使用的配置。该配置支持标准键,如用于跟踪目的的 ‘tags’、‘metadata’,用于控制并行执行工作量的 ‘max_concurrency’,以及其他键。有关更多详细信息,请参阅 RunnableConfig。默认为 None。
return_exceptions (bool) – 是否返回异常而不是引发异常。默认为 False。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
输入索引和 Runnable 输出的元组。
- 返回类型
AsyncIterator[Tuple[int, Union[Output, Exception]]]
- async classmethod acreate_assistant(name: str, instructions: str, tools: Sequence[Union[BaseTool, dict]], model: str, *, async_client: Optional[Union[openai.AsyncOpenAI, openai.AsyncAzureOpenAI]] = None, **kwargs: Any) OpenAIAssistantRunnable [source]¶
异步创建 AsyncOpenAI Assistant 并实例化 Runnable。
- 参数
name (str) – Assistant 名称。
instructions (str) – Assistant 指令。
tools (Sequence[Union[BaseTool, dict]]) – Assistant 工具。可以以 OpenAI 格式或 BaseTools 形式传递。
model (str) – 要使用的 Assistant 模型。
async_client (Optional[Union[openai.AsyncOpenAI, openai.AsyncAzureOpenAI]]) – AsyncOpenAI 客户端。如果未指定,将创建默认的 async_client。
kwargs (Any) –
- 返回值
配置为使用创建的 assistant 运行的 AsyncOpenAIAssistantRunnable。
- 返回类型
- async ainvoke(input: dict, config: Optional[RunnableConfig] = None, **kwargs: Any) OutputType [source]¶
异步调用 assistant。
- 参数
input (dict) –
Runnable 输入字典,可以包含:content:启动新运行时用户的消息。thread_id:要使用的现有线程。run_id:要使用的现有运行。仅在提供时才应提供
初始调用后所需操作的工具输出。
file_ids:要包含在新运行中的文件 ID。用于检索。message_metadata:与新消息关联的元数据。thread_metadata:与新线程关联的元数据。仅在以下情况下相关
创建新线程时。
instructions:其他运行指令。model:覆盖此运行的 Assistant 模型。tools:覆盖此运行的 Assistant 工具。run_metadata:与新运行关联的元数据。
config (Optional[RunnableConfig]) – Runnable 配置。默认为 None。
kwargs (Any) – 其他参数。
- 返回值
- 如果 self.as_agent,将返回
Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则,将返回 OpenAI 类型 Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。
- 返回类型
OutputType
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
Beta
此 API 处于 beta 阶段,将来可能会发生更改。
从 Runnable 创建 BaseTool。
as_tool
将从 Runnable 实例化一个具有名称、描述和args_schema
的 BaseTool。在可能的情况下,架构是从runnable.get_input_schema
推断出来的。或者(例如,如果 Runnable 接受 dict 作为输入,并且未对特定的 dict 键进行类型化),可以使用args_schema
直接指定架构。您也可以传递arg_types
来仅指定必需的参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的架构。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回值
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
字典输入,通过 args_schema 指定架构
from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
字典输入,通过 arg_types 指定架构
from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本新增。
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
astream 的默认实现,它调用 ainvoke。如果子类支持流式输出,则应重写此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
Beta
此 API 处于 beta 阶段,将来可能会发生更改。
生成事件流。
使用此方法创建一个迭代器,用于遍历 StreamEvents,这些 StreamEvents 提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvents。
StreamEvent 是一个具有以下模式的字典
event
: str - 事件名称的格式为格式:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 随机生成的 ID,与给定 Runnable 的执行关联,该 Runnable 发出事件。作为父 Runnable 执行的一部分被调用的子 Runnable 将被分配其自己唯一的 ID。Runnable 的执行,该 Runnable 发出事件。作为父 Runnable 执行的一部分被调用的子 Runnable 将被分配其自己唯一的 ID。
parent_ids
: List[str] - 生成事件的父 Runnables 的 ID。生成事件的父 Runnables 的 ID。根 Runnable 将具有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的 Runnable 的标签。生成事件的 Runnable 的标签。
metadata
: Optional[Dict[str, Any]] - 生成事件的 Runnable 的元数据生成事件的 Runnable 的元数据。
data
: Dict[str, Any]
下面是一个表格,说明了各种链可能发出的一些事件。为了简洁起见,表格中省略了元数据字段。链定义已包含在表格之后。
注意 此参考表适用于 V2 版本的架构。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content=”hello”)
on_chat_model_end
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[模型名称]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[Document(…)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[检索器名称]
{“query”: “hello”}
on_retriever_end
[检索器名称]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[模板名称]
{“question”: “hello”}
on_prompt_end
[模板名称]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件之外,用户还可以调度自定义事件(请参阅下面的示例)。
自定义事件将仅在 API 的 v2 版本中显示!
自定义事件具有以下格式
属性
类型
描述
名称
str
用户为事件定义的名称。
数据
Any
与事件关联的数据。这可以是任何内容,但我们建议使其可 JSON 序列化。
以下是与上面显示的标准事件关联的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:调度自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Any) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。
version (Literal['v1', 'v2']) – 要使用的架构版本,v2 或 v1。用户应使用 v2。v1 用于向后兼容,将在 0.4.0 中弃用。在 API 稳定之前,不会分配默认值。自定义事件将仅在 v2 中显示。
include_names (Optional[Sequence[str]]) – 仅包括来自具有匹配名称的 runnables 的事件。
include_types (Optional[Sequence[str]]) – 仅包括来自具有匹配类型的 runnables 的事件。
include_tags (Optional[Sequence[str]]) – 仅包括来自具有匹配标签的 runnables 的事件。
exclude_names (Optional[Sequence[str]]) – 排除来自具有匹配名称的 runnables 的事件。
exclude_types (Optional[Sequence[str]]) – 排除来自具有匹配类型的 runnables 的事件。
exclude_tags (Optional[Sequence[str]]) – 排除来自具有匹配标签的 runnables 的事件。
kwargs (Any) – 要传递给 Runnable 的其他关键字参数。这些将传递给 astream_log,因为此 astream_events 的实现构建在 astream_log 之上。
- 产生
StreamEvents 的异步流。
- 引发
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用线程池执行器并行运行 invoke。
batch 的默认实现适用于 IO 绑定的 runnables。
如果子类可以更高效地进行批量处理,则应重写此方法;例如,如果底层 Runnable 使用支持批量模式的 API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
List[Output]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]] ¶
并行运行列表中输入的 invoke,并在完成时生成结果。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (Optional[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
配置可在运行时设置的 Runnables 的备选项。
- 参数
which (ConfigurableField) – 将用于选择备选项的 ConfigurableField 实例。
default_key (str) – 如果未选择备选项,则使用的默认键。默认为“default”。
prefix_keys (bool) – 是否为键添加 ConfigurableField id 前缀。默认为 False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。
- 返回值
配置了备选项的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output] ¶
在运行时配置特定的 Runnable 字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。
- 返回值
配置了字段的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- classmethod create_assistant(name: str, instructions: str, tools: Sequence[Union[BaseTool, dict]], model: str, *, client: Optional[Union[openai.OpenAI, openai.AzureOpenAI]] = None, **kwargs: Any) OpenAIAssistantRunnable [source]¶
创建一个 OpenAI Assistant 并实例化 Runnable。
- 参数
name (str) – Assistant 名称。
instructions (str) – Assistant 指令。
tools (Sequence[Union[BaseTool, dict]]) – Assistant 工具。可以以 OpenAI 格式或 BaseTools 形式传递。
model (str) – 要使用的 Assistant 模型。
client (Optional[Union[openai.OpenAI, openai.AzureOpenAI]]) – OpenAI 或 AzureOpenAI 客户端。如果未指定,将创建一个默认的 OpenAI 客户端。
kwargs (Any) – 其他参数。
- 返回值
OpenAIAssistantRunnable 配置为使用创建的 assistant 运行。
- 返回类型
- invoke(input: dict, config: Optional[RunnableConfig] = None) OutputType [source]¶
调用 assistant。
- 参数
input (dict) –
Runnable 输入字典,可以包含:content:启动新运行时用户的消息。thread_id:要使用的现有线程。run_id:要使用的现有运行。仅在提供时才应提供
初始调用后所需操作的工具输出。
file_ids:要包含在新运行中的文件 ID。用于检索。 message_metadata:要与新消息关联的元数据。 thread_metadata:要与新线程关联的元数据。仅在
创建新线程时相关。
instructions:其他运行指令。model:覆盖此运行的 Assistant 模型。tools:覆盖此运行的 Assistant 工具。run_metadata:与新运行关联的元数据。
config (Optional[RunnableConfig]) – Runnable 配置。默认为 None。
- 返回值
- 如果 self.as_agent,将返回
Union[List[OpenAIAssistantAction], OpenAIAssistantFinish]。否则,将返回 OpenAI 类型 Union[List[ThreadMessage], List[RequiredActionFunctionToolCall]]。
- 返回类型
OutputType
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
stream 的默认实现,它调用 invoke。如果子类支持流式输出,则应覆盖此方法。
- 参数
input (Input) – Runnable 的输入。
config (Optional[RunnableConfig]) – 用于 Runnable 的配置。默认为 None。
kwargs (Optional[Any]) – 传递给 Runnable 的其他关键字参数。
- 产生
Runnable 的输出。
- 返回类型
Iterator[Output]
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将 Runnable 序列化为 JSON。
- 返回值
Runnable 的 JSON 可序列化表示形式。
- 返回类型