langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory

class langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory(

在 Kafka 中存储的聊天历史消息。

设置

安装 confluent-kafka-python

pip install confluent_kafka
实例化
from langchain_community.chat_message_histories import KafkaChatMessageHistory

history = KafkaChatMessageHistory(
    session_id="your_session_id",
    bootstrap_servers="host:port",
)
添加和检索消息
# Add messages
history.add_messages([message1, message2, message3, ...])

# Retrieve messages
message_batch_0 = history.messages

# retrieve messages after message_batch_0
message_batch_1 = history.messages

# Reset to beginning and retrieve messages
messages_from_beginning = history.messages_from_beginning()

检索消息是状态的。内部,它使用 Kafka 消费者来读取。消费的偏移量被持久维护。

要检索消息,可以使用以下方法:- messages

继续从上一个消息消费聊天消息。

  • messages_from_beginning:

    将消费者重置到聊天历史的开始,并返回消息。可选参数:1. max_message_count:要返回的最大消息数。2. max_time_sec:等待消息的最大时间(秒)。

  • messages_from_latest:

    将消费者重置到聊天历史的末尾,并尝试消费消息。可选参数与上述相同。

  • messages_from_last_consumed:

    从最后消费的消息继续,类似于 messages。可选参数与上述相同。

max_message_countmax_time_sec 用于避免在检索消息时无限期阻塞

。因此,检索消息的方法可能不会返回所有消息。将 max_message_countmax_time_sec 更改为检索所有历史消息。

参数

属性

messages

检索会话的消息,从 Kafka 主题从最后消费的消息开始连续检索。

方法

__init__(session_id) [,

参数 session_id

单个聊天会话的 ID。它用作 Kafka 主题名称。

aadd_messages(messages)

异步添加消息列表。

aclear()

异步从存储中删除所有消息。

add_ai_message(message)

添加人工智能消息字符串到存储的便利方法。

add_message(message)

向存储中添加消息对象。

add_messages(messages[, flush_timeout_seconds])

通过向Kafka主题发送来添加消息到聊天历史。

add_user_message(message)

添加人类消息字符串到存储的便利方法。

aget_messages()

异步获取消息的版本。

clear()

通过删除Kafka主题来清除聊天历史。

close()

释放资源。

messages_from_beginning([max_message_count, ...])

从Kafka主题中检索从开始的消息。

messages_from_last_consumed([...])

从Kafka主题中检索从最后一个消耗的消息开始的消息。

messages_from_latest([max_message_count, ...])

重置到末尾偏移量。

__init__(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]
参数
async aadd_messages(messages: Sequence[BaseMessage]) None

异步添加消息列表。

参数

messages (Sequence[BaseMessage]) – 要存储的 BaseMessage 对象序列。

返回类型

None

async aclear() None

异步从存储中删除所有消息。

返回类型

None

add_ai_message(message: Union[AIMessage, str]) None

添加人工智能消息字符串到存储的便利方法。

请注意,这是一个便捷方法。代码应优先选择批量添加 messages 界面,以减少底层持久化层的来回传递。

该方法可能在未来的版本中弃用。

参数

message (Union[AIMessage, str]) – 要添加的 AI 消息。

返回类型

None

add_message(message: BaseMessage) None

向存储中添加消息对象。

参数

message (BaseMessage) – 要存储的 BaseMessage 对象。

抛出

NotImplementedError – 如果子类没有实现高效的 add_messages 方法。

返回类型

None

add_messages(messages: Sequence[BaseMessage], flush_timeout_seconds: float = 5.0) None[source]

通过向Kafka主题发送来添加消息到聊天历史。

参数
  • messages (Sequence[BaseMessage]) –

  • flush_timeout_seconds (float) –

返回类型

None

add_user_message(message: Union[HumanMessage, str]) None

添加人类消息字符串到存储的便利方法。

请注意,这是一个便捷方法。代码应优先选择批量添加 messages 界面,以减少底层持久化层的来回传递。

该方法可能在未来的版本中弃用。

参数

message (Union[HumanMessage, str]) – 需要添加到存储中的人类消息。

返回类型

None

async aget_messages() List[BaseMessage]

异步获取消息的版本。

可以重写此方法以提供高效的异步实现。

通常,获取消息可能会涉及到底层持久层的IO操作。

返回类型

BaseMessage>

clear() None[source]

通过删除Kafka主题来清除聊天历史。

返回类型

None

close() None[source]

释放资源。目前没有要释放的资源。

返回类型

None

messages_from_beginning(max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0) List[BaseMessage][source]

从 Kafka 主题从头开始检索消息。该方法将消费者重置到起始位置并消费消息。

参数

max_time_sec: 消费消息的时间限制(秒)。

返回值

消息列表。

参数
  • max_message_count (可选)[int]

  • max_time_sec (可选)[float]

返回类型

BaseMessage>

messages_from_last_consumed(max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0) List[BaseMessage][source]

从上次消费的消息开始从 Kafka 主题检索消息。请注意,该方法是有状态的。它内部使用 Kafka 消费者来消费消息,并维持偏移量提交。

参数

max_time_sec: 消费消息的时间限制(秒)。

返回值

消息列表。

参数
  • max_message_count (可选)[int]

  • max_time_sec (可选)[float]

返回类型

BaseMessage>

messages_from_latest(max_message_count: Optional[int] = 5, max_time_sec: Optional[float] = 5.0) List[BaseMessage][来源]

重置到末尾偏移量。如果可用,尝试消费消息。

参数
  • max_message_count (可选[整数]) – 要消费的最大消息数。

  • max_time_sec (可选[浮点数]) – 消息消费的超时时间(秒)。

返回值

消息列表。

返回类型

BaseMessage>