Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

一个新的卷积配方被合并到Python 3.10中。

鉴于


import collections, operator

from itertools import chain, repeat


size = 3 + 1
kernel = [1/size] * size                                              

Code

def convolve(signal, kernel):
    # See:  https://betterexplained.com/articles/intuitive-convolution/
    # convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
    # convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
    # convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
    kernel = list(reversed(kernel))
    n = len(kernel)
    window = collections.deque([0] * n, maxlen=n)
    for x in chain(signal, repeat(0, n-1)):
        window.append(x)
        yield sum(map(operator.mul, kernel, window))

Demo

list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]

细节

卷积是一种可以应用于移动平均的一般数学运算。其思想是,给定一些数据,您将数据子集(窗口)作为“掩码”或“内核”在数据中滑动,在每个窗口上执行特定的数学操作。在移动平均的情况下,核是平均值:

现在可以通过more_itertools.convolve使用这个实现。 More_itertools是一个流行的第三方包;通过> PIP Install more_itertools安装。

其他回答

更新:下面的例子展示了老熊猫。Rolling_mean函数,该函数在最近版本的pandas中已被删除。该函数调用的现代等价函数将使用pandas.Series.rolling:

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

pandas比NumPy或SciPy更适合这一点。它的函数rolling_mean很方便地完成了这项工作。当输入是一个数组时,它还返回一个NumPy数组。

使用任何定制的纯Python实现都很难在性能上击败rolling_mean。下面是针对两个提议的解决方案的性能示例:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

关于如何处理边缘值,也有很好的选项。

上述所有的解决方案都很差,因为它们缺乏

由于本机python而不是numpy向量化实现, 数值稳定性,由于numpy使用不当。cumsum或 由于O(len(x) * w)实现为卷积的速度。

鉴于

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

注意x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(…)加上x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。结果是x[0:w].mean()

通过cumsum,您将通过添加x[w+1] / w并减去x[0] / w来更新第二个平均值,从而得到x[1:w+1].mean()。

这将一直进行,直到到达x[-w:].mean()。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

这个解是向量化的,O(m),可读且数值稳定。

有点晚了,但我已经做了我自己的小函数,它不环绕端点或垫与零,然后用于查找平均值。进一步的处理是,它还在线性间隔点上对信号进行重新采样。随意定制代码以获得其他特性。

该方法是一个简单的矩阵乘法与规范化高斯核。

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.
    
    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array
    
    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    import numpy as np
    N_in = len(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(np.sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out

正弦信号加正态分布噪声的一个简单用法:

对于一个简短、快速的解决方案,在一个循环中完成所有事情,没有依赖关系,下面的代码工作得很好。

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

我知道这是一个老问题,但这里有一个解决方案,它不使用任何额外的数据结构或库。它在输入列表的元素数量上是线性的,我想不出任何其他方法来使它更有效(实际上,如果有人知道更好的分配结果的方法,请告诉我)。

注意:使用numpy数组而不是列表会快得多,但我想消除所有依赖关系。通过多线程执行也可以提高性能

该函数假设输入列表是一维的,所以要小心。

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

例子

假设我们有一个列表data =[1,2,3,4,5,6],我们想在它上面计算周期为3的滚动平均值,并且你还想要一个与输入列表相同大小的输出列表(这是最常见的情况)。

第一个元素的索引为0,因此滚动平均值应该在索引为-2、-1和0的元素上计算。显然,我们没有data[-2]和data[-1](除非您想使用特殊的边界条件),因此我们假设这些元素为0。这相当于对列表进行零填充,除了我们实际上不填充它,只是跟踪需要填充的索引(从0到N-1)。

所以,对于前N个元素,我们只是在累加器中不断地把元素加起来。

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

从元素N+1开始,简单的累加是行不通的。我们期望的结果是[3]=(2 + 3 + 4)/3 = 3,但这与(sum + 4)/3 = 3.333不同。

计算正确值的方法是用sum+4减去数据[0]= 1,从而得到sum+4 - 1 = 9。

这是因为目前sum =数据[0]+数据[1]+数据[2],但对于每个i >= N也是如此,因为在减法之前,sum是数据[i-N] +…+ data[i-2] + data[i-1]。