Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

一个新的卷积配方被合并到Python 3.10中。

鉴于


import collections, operator

from itertools import chain, repeat


size = 3 + 1
kernel = [1/size] * size                                              

Code

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

Demo

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

细节

卷积是一种可以应用于移动平均的一般数学运算。其思想是,给定一些数据,您将数据子集(窗口)作为“掩码”或“内核”在数据中滑动,在每个窗口上执行特定的数学操作。在移动平均的情况下,核是平均值:

现在可以通过more_itertools.convolve使用这个实现。 More_itertools是一个流行的第三方包;通过> PIP Install more_itertools安装。

其他回答

另一个解决方案是使用标准库和deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

上述所有的解决方案都很差,因为它们缺乏

由于本机python而不是numpy向量化实现, 数值稳定性,由于numpy使用不当。cumsum或 由于O(len(x) * w)实现为卷积的速度。

鉴于

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

注意x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(…)加上x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。结果是x[0:w].mean()

通过cumsum,您将通过添加x[w+1] / w并减去x[0] / w来更新第二个平均值,从而得到x[1:w+1].mean()。

这将一直进行,直到到达x[-w:].mean()。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

这个解是向量化的,O(m),可读且数值稳定。

出于教学目的,让我再添加两个Numpy解决方案(比cumsum解决方案慢):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

使用的函数:as_strided, add.reduceat

对于一个简短、快速的解决方案,在一个循环中完成所有事情,没有依赖关系,下面的代码工作得很好。

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

我还没有检查这有多快,但你可以试试:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)