langchain_core.runnables.history
.RunnableWithMessageHistory¶
注意
RunnableWithMessageHistory 实现了标准的 Runnable 接口
。🏃
Runnable 接口
具有额外的在可运行对象上可用的方法,例如 with_types
、with_retry
、assign
、bind
、get_graph
等。
- class langchain_core.runnables.history.RunnableWithMessageHistory[源代码]¶
-
管理另一个 Runnable 的聊天消息历史的 Runnable。
聊天消息历史是一系列表示对话的消息。
RunnableWithMessageHistory 对另一个 Runnable 进行封装并管理其聊天消息历史;它负责读取和更新聊天消息历史。
以下描述了封装的 Runnable 的输入和输出支持格式。
RunnableWithMessageHistory 必须始终与包含聊天消息历史工厂相应参数的配置一起调用。
默认情况下,Runnable 期望接受一个名为 session_id 的单个配置参数,该参数是一个字符串。此参数用于创建一个新实例或查找与给定 session_id 匹配的现有聊天消息历史。
在这种情况下,调用将如下所示
with_history.invoke(…, config={“configurable”: {“session_id”: “bar”}});例如,
{"configurable": {"session_id": "<SESSION_ID>"}}
。可以通过将
ConfigurableFieldSpec
对象列表传递给history_factory_config
参数来自定义配置(请参阅以下示例)。在以下示例中,我们将使用具有内存实现的聊天消息历史,以便于实验并查看结果。
对于生产用例,您将希望使用持续的聊天消息历史实现,例如
RedisChatMessageHistory
。- 参数
get_session_history – 返回一个新的 BaseChatMessageHistory 函数。此函数应接受一个名为 session_id 的单个位置参数(类型为字符串),并返回相应的聊天消息历史实例。
input_messages_key – 如果基础可运行对象接受字典作为输入,则必须指定。该键在输入字典中包含消息。
output_messages_key – 如果基Runnable返回字典作为输出,则必须指定。包含消息的输出字典中的键。
history_messages_key – 如果基Runnable接受字典作为输入并期望为历史消息指定单独的键,则必须指定。
history_factory_config – 配置应传递给聊天历史工厂的字段。有关更多详细信息,请参阅
ConfigurableFieldSpec
。
示例:具有内存实现的测试用例中的聊天消息历史。
from operator import itemgetter from typing import List from langchain_openai.chat_models import ChatOpenAI from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.documents import Document from langchain_core.messages import BaseMessage, AIMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import ( RunnableLambda, ConfigurableFieldSpec, RunnablePassthrough, ) from langchain_core.runnables.history import RunnableWithMessageHistory class InMemoryHistory(BaseChatMessageHistory, BaseModel): """In memory implementation of chat message history.""" messages: List[BaseMessage] = Field(default_factory=list) def add_messages(self, messages: List[BaseMessage]) -> None: """Add a list of messages to the store""" self.messages.extend(messages) def clear(self) -> None: self.messages = [] # Here we use a global variable to store the chat message history. # This will make it easier to inspect it to see the underlying results. store = {} def get_by_session_id(session_id: str) -> BaseChatMessageHistory: if session_id not in store: store[session_id] = InMemoryHistory() return store[session_id] history = get_by_session_id("1") history.add_message(AIMessage(content="hello")) print(store) # noqa: T201
示例中包装的Runnable接受字典输入
from typing import Optional from langchain_community.chat_models import ChatAnthropic from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables.history import RunnableWithMessageHistory prompt = ChatPromptTemplate.from_messages([ ("system", "You're an assistant who's good at {ability}"), MessagesPlaceholder(variable_name="history"), ("human", "{question}"), ]) chain = prompt | ChatAnthropic(model="claude-2") chain_with_history = RunnableWithMessageHistory( chain, # Uses the get_by_session_id function defined in the example # above. get_by_session_id, input_messages_key="question", history_messages_key="history", ) print(chain_with_history.invoke( # noqa: T201 {"ability": "math", "question": "What does cosine mean?"}, config={"configurable": {"session_id": "foo"}} )) # Uses the store defined in the example above. print(store) # noqa: T201 print(chain_with_history.invoke( # noqa: T201 {"ability": "math", "question": "What's its inverse"}, config={"configurable": {"session_id": "foo"}} )) print(store) # noqa: T201
示例中,session factory需要两个键,user_id和conversation id)
store = {} def get_session_history( user_id: str, conversation_id: str ) -> BaseChatMessageHistory: if (user_id, conversation_id) not in store: store[(user_id, conversation_id)] = InMemoryHistory() return store[(user_id, conversation_id)] prompt = ChatPromptTemplate.from_messages([ ("system", "You're an assistant who's good at {ability}"), MessagesPlaceholder(variable_name="history"), ("human", "{question}"), ]) chain = prompt | ChatAnthropic(model="claude-2") with_message_history = RunnableWithMessageHistory( chain, get_session_history=get_session_history, input_messages_key="question", history_messages_key="history", history_factory_config=[ ConfigurableFieldSpec( id="user_id", annotation=str, name="User ID", description="Unique identifier for the user.", default="", is_shared=True, ), ConfigurableFieldSpec( id="conversation_id", annotation=str, name="Conversation ID", description="Unique identifier for the conversation.", default="", is_shared=True, ), ], ) with_message_history.invoke( {"ability": "math", "question": "What does cosine mean?"}, config={"configurable": {"user_id": "123", "conversation_id": "1"}} )
初始化RunnableWithMessageHistory。
- 参数
runnable –
要包装的基Runnable。必须输入以下之一:1. 基消息序列 2. 包含所有消息的键的字典 3. 包含关键历史消息的当前输入字符串/消息的字典以及
如果输入键指向字符串,则将其视为历史中的HumanMessage。
必须返回输出之一:1. 可视为AIMessage的字符串 2. 基消息或基消息的序列 3. 包含基消息或基消息序列的键的字典
get_session_history –
返回新BaseChatMessageHistory的函数。此函数应接受单个位置参数session_id(字符串类型)并返回相应的聊天消息历史实例。……
python
def get_session_history(
session_id:str,
***
,user_id:Optional[str]=None
) -> BaseChatMessageHistory
…
或者,它应接受与session_history_config_specs键匹配的关键字参数,并返回相应的聊天消息历史实例。
def get_session_history( *, user_id: str, thread_id: str, ) -> BaseChatMessageHistory: ...
input_messages_key – 如果基Runnable接受字典作为输入,则必须指定。默认为None。
output_messages_key – 如果基Runnable返回字典作为输出,则必须指定。默认为None。
history_messages_key – 如果基Runnable接受字典作为输入并期望为历史消息指定单独的键,则必须指定。
history_factory_config – 配置应传递给聊天历史工厂的字段。有关更多详细信息,请参阅
ConfigurableFieldSpec
。指定这些允许您将多个配置键传递给get_session_history工厂。**kwargs – 传递给父类
RunnableBindingBase
init的任意附加kwargs。
- param config: RunnableConfig [Optional]¶
绑定到底层Runnable的配置。
- param config_factories: List[Callable[[RunnableConfig], RunnableConfig]] [Optional]¶
将配置工厂绑定到底层Runnable。
- param custom_input_type: Optional[Any] = None¶
使用自定义类型覆盖底层Runnable的输入类型。
类型可以是pydantic模型,或类型注解(例如,List[str])。
- param custom_output_type: Optional[Any] = None¶
使用自定义类型覆盖底层Runnable的输出类型。
类型可以是pydantic模型,或类型注解(例如,List[str])。
- param get_session_history: GetSessionHistoryCallable [Required]¶
- param history_factory_config: Sequence[ConfigurableFieldSpec] [Required]¶
- param history_messages_key: Optional[str] = None¶
- 参数 :input_messages_key:Optional[str]= None¶
- 参数 kwargs:Mapping[str, Any][Optional]¶
在运行时传递给底层Runnable的kwargs。
例如,当调用Runnable绑定时,底层Runnable将以相同的输入被调用,但附带这些额外的kwargs。
- 参数 output_messages_key:Optional[str]= None¶
- async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output] ¶
默认实现使用asyncio.gather并行运行run Nähe。
默认的批处理实现适用于IO密集型Runnable。
如果子类可以更有效地批量处理,则应重写此方法;例如,如果底层Runnable使用支持批处理模式的API。
- 参数
输入 (列表]输入[) – Runnable 的输入列表。
config (可选]联合]RunnableConfig,列表]RunnableConfig]) – 调用 Runnable 时使用的配置。配置支持标准键,如用于追踪的 ‘tags’、"metadata",用于控制并行工作量的‘max_concurrency’,以及其他键。请参阅 RunnableConfig 了解更多详细信息。默认值为 None。
return_exceptions (布尔值) – 是否返回异常而不是抛出它们。默认值为 False。
kwargs (可选]任何[) – 传递给 Runnable 的额外关键字参数。
- 返回
Runnable 的输出列表。
- 返回类型
列表[输出]
- async abatch_as_completed(inputs: 序列] [输入], config: 可选[联合] [RunnableConfig, 序列] [RunnableConfig]] = None, *, return_exceptions: 布尔值 = False, **kwargs: 可选[任何]) 异步迭代器][元组[整数, 联合] [输出, 异常]]¶
并行在输入列表上运行 run,并在完成时产出结果。
- 参数
inputs (序列]输入[) – Runnable 的输入列表。
config (可选[联合[RunnableConfig,Sequence[RunnableConfig]]]) – 在调用可运行对象时使用的配置。配置支持标准键,如用于跟踪的“tags”、“metadata”,用于控制并行执行工作量的“max_concurrency”,以及其他键。请参阅RunnableConfig获取更多详情。默认为None。
return_exceptions (布尔值) – 是否返回异常而不是抛出它们。默认值为 False。
kwargs (可选]任何[) – 传递给 Runnable 的额外关键字参数。
- 生产
输入和输出的索引的可运行对象元组。
- 返回类型
AsyncIterator[Tuple[int, 联合[Output, 异常]]]
- async ainvoke(input: Input, config: Optional[RunnableConfig], **kwargs: Optional[Any]) Output¶
ainvoke的默认实现,从线程调用invoke。
默认实现允许即使在可运行对象未实现invoke的本地异步版本的情况下也能使用异步代码。
如果子类可以异步运行,应覆盖此方法。
- 参数
input (Input) –
config (可选[RunnableConfig]) –
kwargs (可选[Any]) –
- 返回类型
输出
- as_tool(args_schema: Optional[Type[BaseModel]] = None, *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]] = None) BaseTool ¶
测试版
此API处于测试版,未来可能会有所变动。
从可运行的实体创建BaseTool。
as_tool
将实例化一个带有名称、描述和args_schema
的 BaseTool。在可能的情况下,从runnable.get_input_schema
推断模式。作为替代(例如,如果可运行实体接受一个字典作为输入,且特定的字典键没有类型),可以直接使用args_schema
指定模式。您也可以通过传递arg_types
只指定所需参数及其类型。- 参数
args_schema (Optional[Type[BaseModel]]) – 工具的模式。默认为 None。
name (Optional[str]) – 工具的名称。默认为 None。
description (Optional[str]) – 工具的描述。默认为 None。
arg_types (Optional[Dict[str, Type]]) – 参数名称到类型的字典。默认为 None。
- 返回
BaseTool 实例。
- 返回类型
类型化字典输入
from typing import List from typing_extensions import TypedDict from langchain_core.runnables import RunnableLambda class Args(TypedDict): a: int b: List[int] def f(x: Args) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool() as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过args_schema
指定模式from typing import Any, Dict, List from langchain_core.pydantic_v1 import BaseModel, Field from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) class FSchema(BaseModel): """Apply a function to an integer and list of integers.""" a: int = Field(..., description="Integer") b: List[int] = Field(..., description="List of ints") runnable = RunnableLambda(f) as_tool = runnable.as_tool(FSchema) as_tool.invoke({"a": 3, "b": [1, 2]})
dict
输入,通过arg_types
指定模式from typing import Any, Dict, List from langchain_core.runnables import RunnableLambda def f(x: Dict[str, Any]) -> str: return str(x["a"] * max(x["b"])) runnable = RunnableLambda(f) as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]}) as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
from langchain_core.runnables import RunnableLambda def f(x: str) -> str: return x + "a" def g(x: str) -> str: return x + "z" runnable = RunnableLambda(f) | g as_tool = runnable.as_tool() as_tool.invoke("b")
0.2.14 版本中新加入。
- async astream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Output] ¶
astream 的默认实现,它调用 ainvoke。子类应覆盖此方法以支持流式输出。
- 参数
input (Input)- Runnable 的输入。
config (Optional[RunnableConfig])- 用于 Runnable 的配置。默认为 None。
kwargs (可选]任何[) – 传递给 Runnable 的额外关键字参数。
- 生产
Runnable 的输出。
- 返回类型
AsyncIterator[Output]
- async astream_events(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]] ¶
测试版
此API处于测试版,未来可能会有所变动。
生成事件流。
用于迭代提供关于 Runnable 进度的实时信息的 StreamEvents,包括中间结果的 StreamEvents。
StreamEvent 是一个包含以下模式的字典
event
: str - 事件名称为格式:on_[runnable_type]_(start|stream|end)。
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与给定执行相关的随机生成的 ID。触发事件的Runnable。作为父Runnable执行部分而得到调用的子Runnable将被分配一个唯一的ID。
parent_ids
: 字符串列表 - 生成事件的父Runnable的ID。根Runnable将有一个空列表。父ID的顺序是从根到直接父级的。仅在API的v2版本中有效。API的v1版本将返回一个空列表。
tags
: 可选的字符串列表 - 生成事件的Runnable的标签。
metadata
: 可选的字典[str, 任何]类型 - 生成事件的Runnable的元数据。
data
: 字典[str, 任何]类型
以下是一个表格,说明了各种链可能会发出的一些事件。为了简洁,省略了元数据字段。链定义在表格之后。
注意 此参考表针对的是模式版本2。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[系统消息,人类消息]]}
on_chat_model_stream
[模型名称]
AIMessageChunk(content="hello")
on_chat_model_end
[模型名称]
{“messages”: [[系统消息,人类消息]]}
AIMessageChunk(content="hello world")
on_llm_start
[模型名称]
{‘input': ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[模型名称]
‘Hello human!’
on_chain_start
format_docs
on_chain_stream
format_docs
“hello world!, goodbye world!”
on_chain_end
format_docs
[文档(...)]
“hello world!, goodbye world!”
on_tool_start
some_tool
{“x": 1, “y": “2”}
on_tool_end
some_tool
{“x": 1, “y": “2”}
on_retriever_start
[检索器名称]
{“query": “hello”}
on_retriever_end
[检索器名称]
{“query": “hello”}
[Document(...), ..]
on_prompt_start
[模板名称]
{“question": “hello”}
on_prompt_end
[模板名称]
{“question": “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以分发自定义事件(见以下示例)。
自定义事件仅在API的“I”版本中可见!
自定义事件有以下格式
属性
类型
描述
名称
str
事件的用户定义名称。
数据
任何类型
与事件关联的数据。这可以是一切,尽管我们建议将其设计为JSON可序列化。
以下是与上述标准事件相关的声明
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分发自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- 参数
input (Input)- Runnable 的输入。
config (可选LangChainCoreRunnableConfig]) – 使用Runnable的配置。
版本 – 要使用的模式版本,可以是“v2”或“v1”。用户应使用“v2”。“v1”用于向后兼容,并在0.4.0中弃用。在API稳定之前不会分配默认值。自定义事件仅在“v2”中可见。
include_names – 只包括具有匹配名称的Runnable的事件。
include_types – 只包括类型匹配的Runnable的事件。
include_tags – 只包括具有匹配标签的Runnable的事件。
exclude_names – 排除具有匹配名称的Runnable的事件。
exclude_types – 排除具有匹配类型的Runnable的事件。
exclude_tags – 排除具有匹配标签的Runnable的事件。
kwargs (可选Any]) – 传递给Runnable的额外关键字参数。这些将通过astream_log传递,因为此astream_events实现是建立在其上的。
- 生产
异步流式传输的流事件。
- 引发
NotImplementedError – 如果版本不是 v1 或 v2。
- 返回类型
- batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]¶
默认实现使用线程池执行器并行运行 invoke。
默认的批处理实现适用于IO密集型Runnable。
如果子类可以更有效地批量处理,则应重写此方法;例如,如果底层Runnable使用支持批处理模式的API。
- 参数
inputs (List[Input]) –
config (Optional[Union[RunnableConfig, List[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (可选[Any]) –
- 返回类型
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]¶
并行在输入列表上运行调用, käytettävissä结果完成后。
- 参数
inputs (Sequence[Input]) –
config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –
return_exceptions (bool) –
kwargs (可选[Any]) –
- 返回类型
Iterator[Tuple[int, Union[Output, Exception]]]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) RunnableSerializable[Input, Output] ¶
配置可以在运行时设置的 Runnable 的替代方案。
- 参数
which (ConfigurableField) – 将用于选择替代方案的 ConfigurableField 实例。
default_key (str) – 如果没有选择替代方案,将使用默认关键字。默认为“default”。
prefix_keys (bool) – 是否在键前添加 ConfigurableField id。默认为 False。
**kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到 Runnable 实例或返回 Runnable 实例的可调用对象的字典。
- 返回
配置了替代方案的新 Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]¶
在运行时配置特定的Runnable字段。
- 参数
**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) - 要配置的ConfigurableField实例的字典。
- 返回
配置字段后的新Runnable。
- 返回类型
RunnableSerializable[Input, Output]
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- invoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Output¶
将单个输入转换为输出。重写以实现。
- 参数
input (Input)- Runnable 的输入。
config (Optional[RunnableConfig]) - 调用Runnable时要使用的配置。该配置支持标准的键,如‘tags’、‘metadata’用于跟踪目的、‘max_concurrency’用于控制并行工作数量,以及其他键。请参阅RunnableConfig获取更多详细信息。
kwargs (可选[Any]) –
- 返回
Runnable 的输出。
- 返回类型
输出
- stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output] ¶
stream的默认实现,调用invoke。如果子类支持流式输出,应重写此方法。
- 参数
input (Input)- Runnable 的输入。
config (Optional[RunnableConfig])- 用于 Runnable 的配置。默认为 None。
kwargs (可选]任何[) – 传递给 Runnable 的额外关键字参数。
- 生产
Runnable 的输出。
- 返回类型
Iterator[Output]
- to_json() Union[SerializedConstructor, SerializedNotImplemented] ¶
将Runnable序列化为JSON。
- 返回
Runnable的JSON可序列化表示。
- 返回类型