Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

另一种不使用numpy或pandas找到移动平均线的方法

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
    lambda a,b: b/a, 
    enumerate(itertools.accumulate(sample), 1))
)

将打印[2.0,4.0,6.0,6.5,7.4,7.83333333333333333]

2.0 = (2)/1 4.0 is (2 + 6) / 2 6.0 = (2 + 6 + 10) / 3 .

其他回答

如果你必须为非常小的数组(少于200个元素)重复这样做,我发现只用线性代数就能得到最快的结果。 最慢的部分是建立你的乘法矩阵y,你只需要做一次,但之后可能会更快。

import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]

我觉得使用瓶颈可以很好地解决这个问题

参见下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)

“mm”是“a”的移动平均值。 “窗口”是考虑移动均值的最大条目数。 "min_count"是考虑移动平均值的最小条目数(例如,对于前几个元素或如果数组有nan值)。

好在瓶颈有助于处理nan值,而且非常高效。

上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的特征:

忽略nan值 计算N个相邻值的平均值,不包括兴趣值本身

这第二个特征对于确定哪些值与总体趋势有一定的差异特别有用。

我使用numpy。cumsum,因为这是最省时的方法(参见上面Alleo的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

这段代码只适用于偶数n。它可以通过改变np来调整奇数。插入padded_x和n_nan。

输出示例(黑色为raw,蓝色为movavg):

这段代码可以很容易地修改,以删除从小于cutoff = 3的非nan值计算的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

出于教学目的,让我再添加两个Numpy解决方案(比cumsum解决方案慢):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

使用的函数:as_strided, add.reduceat

一个新的卷积配方被合并到Python 3.10中。

鉴于


import collections, operator

from itertools import chain, repeat


size = 3 + 1
kernel = [1/size] * size                                              

Code

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

Demo

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

细节

卷积是一种可以应用于移动平均的一般数学运算。其思想是,给定一些数据,您将数据子集(窗口)作为“掩码”或“内核”在数据中滑动,在每个窗口上执行特定的数学操作。在移动平均的情况下,核是平均值:

现在可以通过more_itertools.convolve使用这个实现。 More_itertools是一个流行的第三方包;通过> PIP Install more_itertools安装。