langchain_community.utilities.nvidia_riva.RivaASR

注意

RivaASR 实现了标准 Runnable 接口。🏃

Runnable 接口 额外提供了在可运行对象上的方法,例如 with_typeswith_retryassignbindget_graph 等更多方法。

classlangchain_community.utilities.nvidia_riva.RivaASR[源代码]

继承自: RivaAuthMixinRivaCommonConfigMixinRunnableSerializable[AudioStreamstr]

使用 NVIDIA Riva 进行语音识别的运行实例。

paramaudio_channel_count: int = 1

输入音频流中的音频通道数量。

参数 description: str = '用于将音频字节转换为字符串的可运行对象。这对于将音频流输入到链中以预处理并创建LLM提示非常有用。'
参数 enable_automatic_punctuation: bool = True

控制Riva是否尝试在转录的文本中纠正句子标点。

参数 encoding: RivaAudioEncoding = RivaAudioEncoding.LINEAR_PCM

音频流的编码。

参数 language_code: str = 'en-US'

目标语言的[BCP-47语言代码](https://www.rfc-editor.org/rfc/bcp/bcp47.txt)。

参数 profanity_filter: bool = True

控制Riva是否尝试过滤转录文本中的亵渎内容。

参数 sample_rate_hertz: int = 8000

音频流的采样率频率。

参数 ssl_cert: Optional[str] = None

读取Riva的公SSL密钥的全路径。

param url: Union[AnyHttpUrl, str] = AnyHttpUrl('https://127.0.0.1:50051', )

可找到Riva服务的完整URL。

async abatch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用(asyncio.gather)并行运行ainvoke。

默认批次实现对IO密集型runnables效果良好。

子类应重写此方法以提高批处理效率;例如,如果底层的Runnable使用支持批处理模式的API。

参数
  • inputs (List[Input]) – Runnable的输入列表。

  • config (可选[Union[RunnableConfig, List[RunnableConfig]]]) – 调用Runnable时使用的配置。配置支持用于跟踪的标准键(如'tags'、'metadata'),用于控制并行工作的'max_concurrency'等键。请参阅RunnableConfig获取更多详细信息。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是引发它们。默认为False。

  • kwargs (可选[Any]) – 传递给Runnable的额外关键字参数。

返回

来自Runnable的输出列表。

返回类型

List[Output]

async abatch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig, Sequence[RunnableConfig]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) AsyncIterator[Tuple[int, Union[Output, Exception]]]

在一系列输入上并行运行ainvoke,按照完成顺序输出结果。

参数
  • inputs (Sequence[Input]) – Runnable的输入列表。

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) – 调用Runnable时使用的配置。配置支持用于跟踪的标准键,如‘tags’,‘metadata’,用于控制并行工作中有多少工作的‘max_concurrency’,以及其他键。请参阅RunnableConfig以获取更多详细信息。默认为None。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是引发它们。默认为False。

  • kwargs (可选[Any]) – 传递给Runnable的额外关键字参数。

输出

包含输入索引和Runnable输出的一组。

返回类型

AsyncIterator[Tuple[int, Union[Output, Exception]]]

async ainvoke(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Any) Output

ainvoke的默认实现,从线程中调用invoke。

默认实现在Runnable没有实现原生async版本的invoke的情况下也可以使用async代码。

子类如果可以异步运行,应重写此方法。

参数
  • input (Input) –

  • config (Optional[RunnableConfig]) –

  • kwargs (Any) –

返回类型

输出

as_tool(args_schema: Optional[Type[BaseModel]], *, name: Optional[str] = None, description: Optional[str] = None, arg_types: Optional[Dict[str, Type]]) BaseTool

测试版

此API为测试版,未来可能发生变化。

从Runnable创建BaseTool。

as_tool将从Runnable实例化一个带有名称、描述和args_schema的BaseTool。在可能的情况下,模式是从runnable.get_input_schema推断出来的。作为替代(例如,如果Runnable接受一个字典作为输入且特定的字典键未进行类型化),可以通过args_schema直接指定模式。您还可以传递arg_types来仅指定所需的参数及其类型。

参数
  • args_schema (可选[类型[BaseModel]]) – 工具的模式。默认值为None。

  • name (可选[str]) – 工具的名称。默认值为None。

  • description (可选[str]) – 工具的描述。默认值为None。

  • arg_types (可选[Dict[str, 类型]]) – 参数名称到类型的字典。默认值为None。

返回

一个BaseTool实例。

返回类型

BaseTool

类型化字典输入

from typing import List
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda

class Args(TypedDict):
    a: int
    b: List[int]

def f(x: Args) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})

dict输入,通过args_schema指定架构

from typing import Any, Dict, List
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

class FSchema(BaseModel):
    """Apply a function to an integer and list of integers."""

    a: int = Field(..., description="Integer")
    b: List[int] = Field(..., description="List of ints")

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})

dict输入,通过arg_types指定架构

from typing import Any, Dict, List
from langchain_core.runnables import RunnableLambda

def f(x: Dict[str, Any]) -> str:
    return str(x["a"] * max(x["b"]))

runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": List[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})

字符串输入

from langchain_core.runnables import RunnableLambda

def f(x: str) -> str:
    return x + "a"

def g(x: str) -> str:
    return x + "z"

runnable = RunnableLambda(f) | g
as_tool = runnable.as_tool()
as_tool.invoke("b")

新版本0.2.14中引入。

async astream(input: Input, config: Optional[RunnableConfig], **kwargs: Optional[Any]) AsyncIterator[Output]

astream的默认实现,它调用ainvoke。子类应覆盖此方法以支持流式输出。

参数
  • input (Input) – Runnable的输入。

  • config (可选[RunnableConfig]) – 用于Runnable的配置。默认值为None。

  • kwargs (可选[Any]) – 传递给Runnable的额外关键字参数。

输出

Runnable的输出。

返回类型

AsyncIterator[Output]

astream_events(input: Any, config: Optional[RunnableConfig] = None, *, version: Literal['v1', 'v2'], include_names: Optional[Sequence[str]] = None, include_types: Optional[Sequence[str]] = None, include_tags: Optional[Sequence[str]] = None, exclude_names: Optional[Sequence[str]] = None, exclude_types: Optional[Sequence[str]] = None, exclude_tags: Optional[Sequence[str]] = None, **kwargs: Any) AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

测试版

此API为测试版,未来可能发生变化。

生成事件流。

用于创建StreamEvents的迭代器,它们提供关于Runnable进度的实时信息,包括中间结果的事件。

StreamEvent是一个具有以下架构的字典

  • event: str - 事件名称的格式如下:

    on_[runnable_type]_(start|stream|end)。

  • name: str - 生成事件的Runnable的名称。

  • run_id: str - 与给定的执行相关联的随机生成的ID

    触发事件的Runnable。作为父Runnable执行部分而调用的子Runnable被分配一个唯一的ID。

  • parent_ids: 字符串列表 - 生成事件的父Runnable的ID。

    根Runnable将有空的列表。父ID的顺序是从根到直接父级。仅适用于API的v2版本。API的v1版本将返回空列表。

  • tags: 可选的字符串列表 - 生成事件的Runnable的标签。

  • metadata: 可选的字符串到任意类型字典 - 生成事件的Runnable的元数据。

  • data: 字符串到任意类型字典

下面是一个表格,说明了各种链可能会发出的一些事件。由于简洁起见,省略了元数据字段。链定义在表格之后。

注意 此参考表适用于架构的V2版本。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”:[[系统消息,人工消息]]}

on_chat_model_stream

[模型名称]

AIMessageChunk(内容=”hello”)

on_chat_model_end

[模型名称]

{“messages”:[[系统消息,人工消息]]}

AIMessageChunk(内容=”hello world”)

on_llm_start

[模型名称]

{‘input’:’hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[模型名称]

‘Hello human!’

on_chain_start

格式化文档

on_chain_stream

格式化文档

“hello world,再见世界!”

on_chain_end

格式化文档

[文档(...)]

“hello world,再见世界!”

on_tool_start

某些工具

{“x”:1,“y”:”2”}

on_tool_end

某些工具

{“x”:1,“y”:”2”}

on_retriever_start

[检索器名称]

{“query”:”hello”}

on_retriever_end

[检索器名称]

{“query”:”hello”}

[Document(…),..]

on_prompt_start

[模板名称]

{“question”:”hello”}

on_prompt_end

[模板名称]

{“question”:”hello”}

ChatPromptValue(messages:[SystemMessage,…])

除了标准事件,用户还可以发送自定义事件(见下面的示例)。

自定义事件仅在API的v2版本中公开!

自定义事件的格式如下

属性

类型

描述

名称

字符串

事件的用户定义名称。

数据

任意类型

与事件关联的数据。这可以是任何东西,尽管我们建议使其可序列化为JSON。

以下是与上面显示的标准事件相关的声明

格式化文档:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

某些工具:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:发送自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
参数
  • 输入任意类型)- Runnable的输入。

  • 配置可选的RunnableConfig列表)- 用于Runnable的配置。

  • 版本文字[‘v1’,‘v2’])- 要使用的架构版本,无论是v2还是v1。用户应使用v2v1用于向后兼容,并将从0.4.0版本中弃用。将在API稳定之前不指定默认值。自定义事件仅在v2中公开。

  • 包含名称可选的字符串列表)- 只包括具有匹配名称的runnables的事件。

  • 包含类型可选的字符串列表)- 只包括具有匹配类型的runnables的事件。

  • 包含标签可选的字符串列表)- 只包括具有匹配标签的runnables的事件。

  • 排除名称可选的字符串列表)- 排除具有匹配名称的runnables的事件。

  • 排除类型可选的字符串列表)- 排除具有匹配类型的runnables的事件。

  • exclude_tags (可选[序列[str]]) – 从带有匹配标签的可执行任务中排除事件。

  • kwargs (任何类型) – 要传递给 Runnable 的附加关键字参数。这些参数将传递到 astream_log,因为 astream_events 的此实现建立在 astream_log 之上。

输出

异步流式传输的 StreamEvents。

抛出

NotImplementedError – 如果版本不是 v1v2

返回类型

AsyncIterator[Union[StandardStreamEvent, CustomStreamEvent]]

batch(inputs: List[Input], config: Optional[Union[RunnableConfig, List[RunnableConfig]]]= None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) List[Output]

默认实现使用线程池执行器并行运行 invoke。

默认批次实现对IO密集型runnables效果良好。

子类应重写此方法以提高批处理效率;例如,如果底层的Runnable使用支持批处理模式的API。

参数
  • inputs (列表[Input]) –

  • config (可选[Union[RunnableConfig, List[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (可选[任何类型]) –

返回类型

List[Output]

batch_as_completed(inputs: Sequence[Input], config: Optional[Union[RunnableConfig,Sequence[RunnableConfig]]]] = None, *, return_exceptions: bool = False, **kwargs: Optional[Any]) Iterator[Tuple[int, Union[Output, Exception]]]

并行地在输入列表上执行调用,随结果完成而提供结果。

参数
  • inputs (Sequence[Input]) –

  • config (Optional[Union[RunnableConfig, Sequence[RunnableConfig]]]) –

  • return_exceptions (布尔值) –

  • kwargs (可选[任何类型]) –

返回类型

Iterator[Tuple[int, Union[Output, Exception]]]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]) RunnableSerializable[Input, Output]

在运行时可以设置的可运行程序的替代方案配置。

参数
  • which (ConfigurableField) – 用于选择替代方案将使用的 ConfigurableField 实例。

  • default_key (str) – 若未选择替代方案时使用的默认键。默认为“default”。

  • prefix_keys (bool) – 是否在键名前加上 ConfigurableField id。默认为 False。

  • **kwargs (Union[Runnable[Input, Output], Callable[[], Runnable[Input, Output]]]) – 键到可运行实例或返回可运行实例的可调用对象的字典。

返回

配置了替代方案的新可运行程序。

返回类型

运行可序列化程序[输入,输出]

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) RunnableSerializable[Input, Output]

在运行时配置特定的可运行的字段。

参数

**kwargs (Union[ConfigurableField, ConfigurableFieldSingleOption, ConfigurableFieldMultiOption]) – 要配置的 ConfigurableField 实例的字典。

返回

具有配置字段的新的可运行的实例。

返回类型

运行可序列化程序[输入,输出]

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
invoke(input: AudioStream, _: Optional[RunnableConfig] = None) str[源代码]

使用 Riva 将音频字节转录成字符串。

参数
返回类型

字符串

stream(input: Input, config: Optional[RunnableConfig] = None, **kwargs: Optional[Any]) Iterator[Output]

stream 的默认实现,调用 invoke。子类应覆盖此方法以支持流式输出。

参数
  • input (Input) – Runnable的输入。

  • config (可选[RunnableConfig]) – 用于Runnable的配置。默认值为None。

  • kwargs (可选[Any]) – 传递给Runnable的额外关键字参数。

输出

Runnable的输出。

返回类型

Iterator[Output]

to_json() Union[SerializedConstructor, SerializedNotImplemented]

将 Runnable 序列化为 JSON。

返回

Runnable 的 JSON 序列化表示。

返回类型

Union[SerializedConstructor, SerializedNotImplemented]

property auth: riva.client.Auth

返回一个 riva 客户端认证对象。

property config: riva.client.StreamingRecognitionConfig

创建并返回 riva 配置对象。

RivaASR 的示例用法