langchain_community.chat_message_histories.kafka
.KafkaChatMessageHistory¶
- class langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory(
在 Kafka 中存储的聊天历史消息。
- 设置
安装
confluent-kafka-python
。pip install confluent_kafka
- 实例化
from langchain_community.chat_message_histories import KafkaChatMessageHistory history = KafkaChatMessageHistory( session_id="your_session_id", bootstrap_servers="host:port", )
- 添加和检索消息
# Add messages history.add_messages([message1, message2, message3, ...]) # Retrieve messages message_batch_0 = history.messages # retrieve messages after message_batch_0 message_batch_1 = history.messages # Reset to beginning and retrieve messages messages_from_beginning = history.messages_from_beginning()
检索消息是状态的。内部,它使用 Kafka 消费者来读取。消费的偏移量被持久维护。
要检索消息,可以使用以下方法:- messages
继续从上一个消息消费聊天消息。
- messages_from_beginning:
将消费者重置到聊天历史的开始,并返回消息。可选参数:1. max_message_count:要返回的最大消息数。2. max_time_sec:等待消息的最大时间(秒)。
- messages_from_latest:
将消费者重置到聊天历史的末尾,并尝试消费消息。可选参数与上述相同。
- messages_from_last_consumed:
从最后消费的消息继续,类似于 messages。可选参数与上述相同。
- max_message_count 和 max_time_sec 用于避免在检索消息时无限期阻塞
。因此,检索消息的方法可能不会返回所有消息。将 max_message_count 和 max_time_sec 更改为检索所有历史消息。
- 参数
session_id (str) – 单个聊天会话的 ID。它用作 Kafka 主题名称。
bootstrap_servers (str) – 以逗号分隔的主机/端口对,用于连接到 Kafka 集群 https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
ttl_ms (int) – 条目自动过期的时间(毫秒)。默认 7 天。-1 为无过期。它对应于 https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
replication_factor (int) – 主题的复制因子。默认 1。
partition (int) – 主题的分区数。默认 3。
属性
messages
检索会话的消息,从 Kafka 主题从最后消费的消息开始连续检索。
方法
__init__
(session_id) [,- 参数 session_id
单个聊天会话的 ID。它用作 Kafka 主题名称。
aadd_messages
(messages)异步添加消息列表。
aclear
()异步从存储中删除所有消息。
add_ai_message
(message)添加人工智能消息字符串到存储的便利方法。
add_message
(message)向存储中添加消息对象。
add_messages
(messages[, flush_timeout_seconds])通过向Kafka主题发送来添加消息到聊天历史。
add_user_message
(message)添加人类消息字符串到存储的便利方法。
异步获取消息的版本。
clear
()通过删除Kafka主题来清除聊天历史。
close
()释放资源。
messages_from_beginning
([max_message_count, ...])从Kafka主题中检索从开始的消息。
messages_from_last_consumed
([...])从Kafka主题中检索从最后一个消耗的消息开始的消息。
messages_from_latest
([max_message_count, ...])重置到末尾偏移量。
- __init__(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]¶
- 参数
session_id (str) – 单个聊天会话的 ID。它用作 Kafka 主题名称。
bootstrap_servers (str) – 以逗号分隔的主机/端口对,用于连接到 Kafka 集群 https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
ttl_ms (int) – 条目自动过期的时间(毫秒)。默认 7 天。-1 为无过期。它对应于 https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
replication_factor (int) – 主题的复制因子。默认 1。
partition (int) – 主题的分区数。默认 3。
- async aadd_messages(messages: Sequence[BaseMessage]) None ¶
异步添加消息列表。
- 参数
messages (Sequence[BaseMessage]) – 要存储的 BaseMessage 对象序列。
- 返回类型
None
- async aclear() None ¶
异步从存储中删除所有消息。
- 返回类型
None
- add_ai_message(message: Union[AIMessage, str]) None ¶
添加人工智能消息字符串到存储的便利方法。
请注意,这是一个便捷方法。代码应优先选择批量添加 messages 界面,以减少底层持久化层的来回传递。
该方法可能在未来的版本中弃用。
- 参数
message (Union[AIMessage, str]) – 要添加的 AI 消息。
- 返回类型
None
- add_message(message: BaseMessage) None ¶
向存储中添加消息对象。
- 参数
message (BaseMessage) – 要存储的 BaseMessage 对象。
- 抛出
NotImplementedError – 如果子类没有实现高效的 add_messages 方法。
- 返回类型
None
- add_messages(messages: Sequence[BaseMessage], flush_timeout_seconds: float = 5.0) None [source]¶
通过向Kafka主题发送来添加消息到聊天历史。
- 参数
messages (Sequence[BaseMessage]) –
flush_timeout_seconds (float) –
- 返回类型
None
- add_user_message(message: Union[HumanMessage, str]) None ¶
添加人类消息字符串到存储的便利方法。
请注意,这是一个便捷方法。代码应优先选择批量添加 messages 界面,以减少底层持久化层的来回传递。
该方法可能在未来的版本中弃用。
- 参数
message (Union[HumanMessage, str]) – 需要添加到存储中的人类消息。
- 返回类型
None
- async aget_messages() List[BaseMessage] ¶
异步获取消息的版本。
可以重写此方法以提供高效的异步实现。
通常,获取消息可能会涉及到底层持久层的IO操作。
- 返回类型
- messages_from_beginning(max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0) List[BaseMessage] [source]¶
从 Kafka 主题从头开始检索消息。该方法将消费者重置到起始位置并消费消息。
- 参数
max_time_sec: 消费消息的时间限制(秒)。
- 返回值
消息列表。
- 参数
max_message_count (可选)[int] –
max_time_sec (可选)[float] –
- 返回类型
- messages_from_last_consumed(max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0) List[BaseMessage] [source]¶
从上次消费的消息开始从 Kafka 主题检索消息。请注意,该方法是有状态的。它内部使用 Kafka 消费者来消费消息,并维持偏移量提交。
- 参数
max_time_sec: 消费消息的时间限制(秒)。
- 返回值
消息列表。
- 参数
max_message_count (可选)[int] –
max_time_sec (可选)[float] –
- 返回类型
- messages_from_latest(max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0) List[BaseMessage] [来源]¶
重置到末尾偏移量。如果可用,尝试消费消息。
- 参数
max_message_count (可选[整数]) – 要消费的最大消息数。
max_time_sec (可选[浮点数]) – 消息消费的超时时间(秒)。
- 返回值
消息列表。
- 返回类型