langchain_core.utils.aiter
.Tee¶
- class langchain_core.utils.aiter.Tee(iterable: AsyncIterator[T], n: int = 2, *, lock: Optional[AsyncContextManager[Any]] = None)[source]¶
在
iterable
上创建n
个独立的异步迭代器。这会将单个
iterable
分割成多个迭代器,每个迭代器按相同的顺序提供相同的项。所有子迭代器可以分别前进,但共享来自iterable
的相同项 – 当最先进的迭代器检索到一个项时,该项会在最不先进的迭代器也产生该项之前被缓冲。一个tee
的工作方式是懒惰的,并且可以处理无限iterable
,前提是所有迭代器都前进。async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current)
与
itertools.tee()
不同,tee()
返回一个自定义类型,而不是一个tuple
。像元组一样,它可以被索引、迭代和拆包以获取子迭代器。此外,它的aclose()
方法立即关闭所有子迭代器,并且可以在async with
上下文中使用,以产生相同的效果。如果
iterable
是一个迭代器并且在别处读取,tee
将 不会 提供这些项。此外,tee
必须在内部缓冲每个项,直到最后一个迭代器产生它;如果最先进的迭代器和最不先进的迭代器之间的数据量最多,那么使用list
更有效(但不是懒惰的)。如果底层迭代器是并发安全的(
anext
可以并发等待),则结果迭代器也是并发安全的。否则,如果只有一个单独的“最先进的”迭代器,迭代器才是安全的。要强制按顺序使用anext
,提供一个lock
- 例如,在一个asyncio
应用程序中的一个asyncio.Lock
实例 - 访问将自动同步。方法
__init__
(iterable[, n, lock])aclose
()异步关闭所有子迭代器。
- 参数
iterable (AsyncIterator[T]) –
n (int) –
lock (Optional[AsyncContextManager[Any]]) –