langchain_core.utils.iter
.Tee¶
- class langchain_core.utils.iter.Tee(iterable: Iterator[T], n: int = 2, *, lock: Optional[ContextManager[Any]] = None)[源代码]¶
在迭代器
iterable
上创建n
个独立的异步迭代器这会将单个
iterable
分割成多个迭代器,每个迭代器按相同的顺序提供相同的项。所有子迭代器可以单独前进,但共享来自iterable
的相同项 – 当最先进的迭代器检索一个项时,它会被缓冲,直到最不先进的迭代器也产生了它。一个tee
是懒惰的,可以处理无限的iterable
,只要所有迭代器都在前进。async def derivative(sensor_data): previous, current = a.tee(sensor_data, n=2) await a.anext(previous) # advance one iterator return a.map(operator.sub, previous, current)
与
itertools.tee()
不同,tee()
返回一个自定义类型,而不是一个tuple
。与元组一样,它可以被索引、迭代和展开以获取子迭代器。此外,它的aclose()
方法立即关闭所有子对象,并且可以在async with
上下文中使用以实现相同的效果。如果
iterable
是一个迭代器并且在其他地方被读取,则tee
将不会提供这些项。此外,tee
必须在内部缓冲每个项,直到最后一个迭代器产生它;如果最先进和最不先进的迭代器在数据上差异很大,使用一个list
会更有效(但不是懒惰的)。如果底层迭代器是并发安全的(
anext
可以并发等待),则生成的迭代器也是并发安全的。否则,如果只有一个单独的“最先进的”迭代器,迭代器是安全的。为了强制使用anext
的顺序性,请提供一个lock
- 例如,一个在asyncio
应用程序中的asyncio.Lock
实例 - 并且访问会自动同步。创建一个新的
tee
。- 参数
iterable (Iterator[T]) – 要分割的迭代器。
n (int) – 要创建的迭代器数量。默认为2。
锁 (可选[上下文管理器[任何内容]]) – 用于同步对共享缓冲区访问的锁。默认为 None。
方法
__init__
(iterable[, n, lock])创建一个新的
tee
。关闭
()