Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

如果你必须为非常小的数组(少于200个元素)重复这样做,我发现只用线性代数就能得到最快的结果。 最慢的部分是建立你的乘法矩阵y,你只需要做一次,但之后可能会更快。

import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]

其他回答

Python标准库解决方案

这个生成器函数接受一个可迭代对象和一个窗口大小为N的值,并生成窗口内当前值的平均值。它使用了deque,这是一种类似于列表的数据结构,但针对在两端进行快速修改(弹出、追加)进行了优化。

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)
        

下面是函数的运行情况:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

如果你选择自己生成,而不是使用现有的库,请注意浮点错误并尽量减少其影响:

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0

    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )

    def get_total(self):
        return sum( reversed(self.values) )

    def get_size( self ):
        return self.count

如果所有的值都是大致相同的数量级,那么这将通过始终添加大致相似的数量级值来帮助保持精度。

我觉得使用瓶颈可以很好地解决这个问题

参见下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)

“mm”是“a”的移动平均值。 “窗口”是考虑移动均值的最大条目数。 "min_count"是考虑移动平均值的最小条目数(例如,对于前几个元素或如果数组有nan值)。

好在瓶颈有助于处理nan值,而且非常高效。

另一个解决方案是使用标准库和deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

如果你必须为非常小的数组(少于200个元素)重复这样做,我发现只用线性代数就能得到最快的结果。 最慢的部分是建立你的乘法矩阵y,你只需要做一次,但之后可能会更快。

import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]