langchain_core.utils.iter.Tee

class langchain_core.utils.iter.Tee(iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None)[源代码]

在迭代器iterable上创建n个独立的异步迭代器

这会将单个iterable分割成多个迭代器,每个迭代器按相同的顺序提供相同的项。所有子迭代器可以单独前进,但共享来自iterable的相同项 – 当最先进的迭代器检索一个项时,它会被缓冲,直到最不先进的迭代器也产生了它。一个tee是懒惰的,可以处理无限的iterable,只要所有迭代器都在前进。

async def derivative(sensor_data):
    previous, current = a.tee(sensor_data, n=2)
    await a.anext(previous)  # advance one iterator
    return a.map(operator.sub, previous, current)

itertools.tee()不同,tee()返回一个自定义类型,而不是一个tuple。与元组一样,它可以被索引、迭代和展开以获取子迭代器。此外,它的aclose()方法立即关闭所有子对象,并且可以在async with上下文中使用以实现相同的效果。

如果iterable是一个迭代器并且在其他地方被读取,则tee不会提供这些项。此外,tee必须在内部缓冲每个项,直到最后一个迭代器产生它;如果最先进和最不先进的迭代器在数据上差异很大,使用一个list会更有效(但不是懒惰的)。

如果底层迭代器是并发安全的(anext可以并发等待),则生成的迭代器也是并发安全的。否则,如果只有一个单独的“最先进的”迭代器,迭代器是安全的。为了强制使用anext的顺序性,请提供一个lock - 例如,一个在asyncio应用程序中的asyncio.Lock实例 - 并且访问会自动同步。

创建一个新的tee

参数
  • iterable (Iterator[T]) – 要分割的迭代器。

  • n (int) – 要创建的迭代器数量。默认为2。

  • (可选[上下文管理器[任何内容]]) – 用于同步对共享缓冲区访问的锁。默认为 None。

方法

__init__(iterable[, n, lock])

创建一个新的tee

关闭()

__init__(iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None)[源代码]

创建一个新的tee

参数
  • iterable (Iterator[T]) – 要分割的迭代器。

  • n (int) – 要创建的迭代器数量。默认为2。

  • (可选[上下文管理器[任何内容]]) – 用于同步对共享缓冲区访问的锁。默认为 None。

close() None[源代码]
返回类型

None