我需要一个滚动窗口(又名滑动窗口)可迭代的序列/迭代器/生成器。(默认的Python迭代可以被认为是一种特殊情况,其中窗口长度为1。)我目前正在使用以下代码。我怎样才能做得更优雅和/或更有效?
def rolling_window(seq, window_size):
it = iter(seq)
win = [it.next() for cnt in xrange(window_size)] # First window
yield win
for e in it: # Subsequent windows
win[:-1] = win[1:]
win[-1] = e
yield win
if __name__=="__main__":
for w in rolling_window(xrange(6), 3):
print w
"""Example output:
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
"""
对于window_size == 2的特定情况(即,在序列中迭代相邻的重叠对),请参见如何从列表中迭代重叠(当前,下一个)值对?
只是一个简短的贡献。
由于当前的python文档在itertool示例中没有“window”(即,在http://docs.python.org/library/itertools.html的底部),这里有一个基于
石斑鱼的代码,这是给出的例子之一:
import itertools as it
def window(iterable, size):
shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)]
return it.izip(*shiftedStarts)
基本上,我们创建了一系列切片迭代器,每个迭代器的起点都在前面一个位置。然后,我们把它们拉在一起。注意,这个函数返回一个生成器(它本身不是直接的生成器)。
就像上面的appendingelement和advingiterator版本一样,性能(即,哪个是最好的)随列表大小和窗口大小而变化。我喜欢这个,因为它是一个两行代码(它也可以是一行代码,但我更喜欢命名概念)。
事实证明上面的代码是错误的。如果传递给iterable的参数是一个序列则有效,但如果它是一个迭代器则无效。如果它是一个迭代器,那么在islice调用之间共享相同的迭代器(但不是tee - d),这将严重破坏事情。
下面是一些固定的代码:
import itertools as it
def window(iterable, size):
itrs = it.tee(iterable, size)
shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)]
return it.izip(*shiftedStarts)
另外,书里还有一个版本。这个版本不是复制一个迭代器,然后多次向前复制,而是在开始位置向前移动时成对复制每个迭代器。因此,迭代器t既提供了起点为t的“完整”迭代器,也提供了创建迭代器t + 1的基础:
import itertools as it
def window4(iterable, size):
complete_itr, incomplete_itr = it.tee(iterable, 2)
iters = [complete_itr]
for i in xrange(1, size):
incomplete_itr.next()
complete_itr, incomplete_itr = it.tee(incomplete_itr, 2)
iters.append(complete_itr)
return it.izip(*iters)
我的两个版本的窗口实现
from typing import Sized, Iterable
def window(seq: Sized, n: int, strid: int = 1, drop_last: bool = False):
for i in range(0, len(seq), strid):
res = seq[i:i + n]
if drop_last and len(res) < n:
break
yield res
def window2(seq: Iterable, n: int, strid: int = 1, drop_last: bool = False):
it = iter(seq)
result = []
step = 0
for i, ele in enumerate(it):
result.append(ele)
result = result[-n:]
if len(result) == n:
if step % strid == 0:
yield result
step += 1
if not drop_last:
yield result
我喜欢t ():
from itertools import tee, izip
def window(iterable, size):
iters = tee(iterable, size)
for i in xrange(1, size):
for each in iters[i:]:
next(each, None)
return izip(*iters)
for each in window(xrange(6), 3):
print list(each)
给:
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]