Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

你可以用以下方法计算运行平均值:

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

但是速度很慢。

幸运的是,numpy包含一个卷积函数,我们可以用它来加快速度。运行均值相当于将x与一个长度为N的向量进行卷积,其中所有元素都等于1/N。卷积的numpy实现包括起始瞬态,所以你必须删除前N-1点:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

在我的机器上,快速版本要快20-30倍,这取决于输入向量的长度和平均窗口的大小。

请注意,卷积确实包括一个“相同”模式,它似乎应该解决开始的瞬态问题,但它在开始和结束之间分割。

其他回答

一个新的卷积配方被合并到Python 3.10中。

鉴于


import collections, operator

from itertools import chain, repeat


size = 3 + 1
kernel = [1/size] * size                                              

Code

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

Demo

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

细节

卷积是一种可以应用于移动平均的一般数学运算。其思想是,给定一些数据,您将数据子集(窗口)作为“掩码”或“内核”在数据中滑动,在每个窗口上执行特定的数学操作。在移动平均的情况下,核是平均值:

现在可以通过more_itertools.convolve使用这个实现。 More_itertools是一个流行的第三方包;通过> PIP Install more_itertools安装。

有关现成的解决方案,请参见https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html。 它提供了平窗类型的运行平均值。请注意,这比简单的do-it-yourself卷积方法要复杂一些,因为它试图通过反射数据来处理数据开头和结尾的问题(在您的情况下可能有效,也可能无效……)。

首先,你可以试着:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

出于教学目的,让我再添加两个Numpy解决方案(比cumsum解决方案慢):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

使用的函数:as_strided, add.reduceat

上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的特征:

忽略nan值 计算N个相邻值的平均值,不包括兴趣值本身

这第二个特征对于确定哪些值与总体趋势有一定的差异特别有用。

我使用numpy。cumsum,因为这是最省时的方法(参见上面Alleo的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

这段代码只适用于偶数n。它可以通过改变np来调整奇数。插入padded_x和n_nan。

输出示例(黑色为raw,蓝色为movavg):

这段代码可以很容易地修改,以删除从小于cutoff = 3的非nan值计算的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

我还没有检查这有多快,但你可以试试:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)