我需要一个滚动窗口(又名滑动窗口)可迭代的序列/迭代器/生成器。(默认的Python迭代可以被认为是一种特殊情况,其中窗口长度为1。)我目前正在使用以下代码。我怎样才能做得更优雅和/或更有效?

def rolling_window(seq, window_size):
    it = iter(seq)
    win = [it.next() for cnt in xrange(window_size)] # First window
    yield win
    for e in it: # Subsequent windows
        win[:-1] = win[1:]
        win[-1] = e
        yield win

if __name__=="__main__":
    for w in rolling_window(xrange(6), 3):
        print w

"""Example output:   
   [0, 1, 2]
   [1, 2, 3]
   [2, 3, 4]
   [3, 4, 5]
"""

对于window_size == 2的特定情况(即,在序列中迭代相邻的重叠对),请参见如何从列表中迭代重叠(当前,下一个)值对?


当前回答

这里是一个泛化,增加了对step, fillvalue参数的支持:

from collections import deque
from itertools import islice

def sliding_window(iterable, size=2, step=1, fillvalue=None):
    if size < 0 or step < 1:
        raise ValueError
    it = iter(iterable)
    q = deque(islice(it, size), maxlen=size)
    if not q:
        return  # empty iterable or size == 0
    q.extend(fillvalue for _ in range(size - len(q)))  # pad to size
    while True:
        yield iter(q)  # iter() to avoid accidental outside modifications
        try:
            q.append(next(it))
        except StopIteration: # Python 3.5 pep 479 support
            return
        q.extend(next(it, fillvalue) for _ in range(step - 1))

它每次产生块大小的项目,每次迭代滚动步骤位置,在必要时用fillvalue填充每个块。示例:size=4, step=3, fillvalue='*':

 [a b c d]e f g h i j k l m n o p q r s t u v w x y z
  a b c[d e f g]h i j k l m n o p q r s t u v w x y z
  a b c d e f[g h i j]k l m n o p q r s t u v w x y z
  a b c d e f g h i[j k l m]n o p q r s t u v w x y z
  a b c d e f g h i j k l[m n o p]q r s t u v w x y z
  a b c d e f g h i j k l m n o[p q r s]t u v w x y z
  a b c d e f g h i j k l m n o p q r[s t u v]w x y z
  a b c d e f g h i j k l m n o p q r s t u[v w x y]z
  a b c d e f g h i j k l m n o p q r s t u v w x[y z * *]

有关step参数的用例示例,请参见用python有效地处理大型.txt文件。

其他回答

我最终使用的解决方案(保持简单):

def sliding_window(items, size):
    return [items[start:end] for start, end
            in zip(range(0, len(items) - size + 1), range(size, len(items) + 1))]

不用说,项目序列需要是可切片的。使用索引并不理想,但考虑到其他选项,这似乎是最不坏的选择……这也可以很容易地更改为生成器:只需替换[…]和……

多个迭代器!

def window(seq, size, step=1):
    # initialize iterators
    iters = [iter(seq) for i in range(size)]
    # stagger iterators (without yielding)
    [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)]
    while(True):
        yield [next(i) for i in iters]
        # next line does nothing for step = 1 (skips iterations for step > 1)
        [next(i) for i in iters for j in range(step-1)]

next(it)在序列结束时引发StopIteration,出于一些我无法理解的很酷的原因,yield语句在这里除外它,函数返回,忽略没有形成完整窗口的剩余值。

无论如何,这是目前为止行数最少的解决方案,它唯一的要求是seq实现__iter__或__getitem__,并且除了@dansalmo的解决方案之外,不依赖于itertools或集合:)

为了演示如何组合itertools recipe,我将使用consume recipe尽可能直接地将成对的recipe扩展回窗口recipe:

def consume(iterator, n):
    "Advance the iterator n-steps ahead. If n is none, consume entirely."
    # Use functions that consume iterators at C speed.
    if n is None:
        # feed the entire iterator into a zero-length deque
        collections.deque(iterator, maxlen=0)
    else:
        # advance to the empty slice starting at position n
        next(islice(iterator, n, n), None)

def window(iterable, n=2):
    "s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..."
    iters = tee(iterable, n)
    # Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's
    # slower for larger window sizes, while saving only small fixed "noop" cost
    for i, it in enumerate(iters):
        consume(it, i)
    return zip(*iters)

窗口的配方与成对的相同,它只是将第二个tee-ed迭代器上的单个元素“consume”替换为逐步增加n - 1个迭代器上的consume。使用consume而不是在islice中包装每个迭代器稍微快一些(对于足够大的可迭代对象),因为您只在消费阶段支付islice包装开销,而不是在提取每个窗口值的过程中(因此它以n为界,而不是iterable中项目的数量)。

在性能方面,与其他一些解决方案相比,这是相当不错的(并且比我测试的任何其他解决方案都要好)。在Python 3.5.0, Linux x86-64上测试,使用ipython %timeit magic。

Kindall是deque解决方案,通过使用islice而不是自制生成器表达式来调整性能/正确性,并测试产生的长度,因此当可迭代对象比窗口短时不会产生结果,以及通过位置传递deque的maxlen而不是通过关键字(对于较小的输入产生惊人的差异):

>>> %timeit -r5 deque(windowkindall(range(10), 3), 0)
100000 loops, best of 5: 1.87 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 3), 0)
10000 loops, best of 5: 72.6 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 30), 0)
1000 loops, best of 5: 71.6 μs per loop

与之前改编的kindall解决方案相同,但每个yield win都更改为yield tuple(win),因此存储来自生成器的结果,而不需要所有存储的结果真正成为最新结果的视图(在这种情况下,所有其他合理的解决方案都是安全的),并将tuple=tuple添加到函数定义中,以将tuple的使用从LEGB中的B移动到L:

>>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0)
100000 loops, best of 5: 3.05 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0)
10000 loops, best of 5: 207 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0)
1000 loops, best of 5: 348 μs per loop

如上所示的基于消费的解决方案:

>>> %timeit -r5 deque(windowconsume(range(10), 3), 0)
100000 loops, best of 5: 3.92 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 3), 0)
10000 loops, best of 5: 42.8 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 30), 0)
1000 loops, best of 5: 232 μs per loop

与consume相同,但是内联了consume的else case以避免函数调用,n是None测试以减少运行时间,特别是对于设置开销是工作中有意义的一部分的小输入:

>>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0)
100000 loops, best of 5: 3.57 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0)
10000 loops, best of 5: 40.9 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0)
1000 loops, best of 5: 211 μs per loop

(旁注:成对的一种变体,重复使用tee和默认参数2来创建嵌套的tee对象,因此任何给定的迭代器只前进一次,而不是独立地消耗越来越多的次数,类似于MrDrFenner的答案类似于非内联消耗,并且在所有测试中比内联消耗更慢,所以为了简洁起见,我省略了这些结果)。

As you can see, if you don't care about the possibility of the caller needing to store results, my optimized version of kindall's solution wins most of the time, except in the "large iterable, small window size case" (where inlined consume wins); it degrades quickly as the iterable size increases, while not degrading at all as the window size increases (every other solution degrades more slowly for iterable size increases, but also degrades for window size increases). It can even be adapted for the "need tuples" case by wrapping in map(tuple, ...), which runs ever so slightly slower than putting the tupling in the function, but it's trivial (takes 1-5% longer) and lets you keep the flexibility of running faster when you can tolerate repeatedly returning the same value.

如果需要安全的存储返回值,除了最小的输入大小外,内联消耗在所有输入大小上都占优势(非内联消耗略慢,但扩展相似)。基于deque & tuple的解决方案只对最小的输入获胜,因为更小的设置成本,收益也很小;当可迭代对象变长时,它会严重退化。

为了记录,我使用的kindall的解决方案的改编版本产生元组是:

def windowkindalltupled(iterable, n=2, tuple=tuple):
    it = iter(iterable)
    win = deque(islice(it, n), n)
    if len(win) < n:
        return
    append = win.append
    yield tuple(win)
    for e in it:
        append(e)
        yield tuple(win)

在函数定义行中删除缓存tuple,并在每个yield中使用tuple,以获得更快但不太安全的版本。

为什么不

def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

它被记录在Python文档中。 您可以轻松地将其扩展到更宽的窗口。

toolz/cytoolz包有一个sliding_window函数。

>>> from cytoolz import sliding_window
>>> list(sliding_window(3, range(6))) # returns [(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, 5)]