langchain_core.utils.aiter.Tee

class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None)[source]

iterable 上创建 n 个独立的异步迭代器。

这会将单个 iterable 分割成多个迭代器,每个迭代器按相同的顺序提供相同的项。所有子迭代器可以分别前进,但共享来自 iterable 的相同项 – 当最先进的迭代器检索到一个项时,该项会在最不先进的迭代器也产生该项之前被缓冲。一个 tee 的工作方式是懒惰的,并且可以处理无限 iterable,前提是所有迭代器都前进。

async def derivative(sensor_data):
    previous, current = a.tee(sensor_data, n=2)
    await a.anext(previous)  # advance one iterator
    return a.map(operator.sub, previous, current)

itertools.tee() 不同,tee() 返回一个自定义类型,而不是一个 tuple。像元组一样,它可以被索引、迭代和拆包以获取子迭代器。此外,它的 aclose() 方法立即关闭所有子迭代器,并且可以在 async with 上下文中使用,以产生相同的效果。

如果 iterable 是一个迭代器并且在别处读取,tee不会 提供这些项。此外,tee 必须在内部缓冲每个项,直到最后一个迭代器产生它;如果最先进的迭代器和最不先进的迭代器之间的数据量最多,那么使用 list 更有效(但不是懒惰的)。

如果底层迭代器是并发安全的(anext 可以并发等待),则结果迭代器也是并发安全的。否则,如果只有一个单独的“最先进的”迭代器,迭代器才是安全的。要强制按顺序使用 anext,提供一个 lock - 例如,在一个 asyncio 应用程序中的一个 asyncio.Lock 实例 - 访问将自动同步。

方法

__init__(iterable[, n, lock])

aclose()

异步关闭所有子迭代器。

参数
  • iterable (AsyncIterator[T]) –

  • n (int) –

  • lock (Optional[AsyncContextManager[Any]]) –

__init__(iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None)[source]
参数
  • iterable (AsyncIterator[T]) –

  • n (int) –

  • lock (Optional[AsyncContextManager[Any]]) –

async aclose() None[source]

异步关闭所有子迭代器。

返回类型

None