Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

我的解决方案是基于维基百科上的“简单移动平均”。

from numba import jit
@jit
def sma(x, N):
    s = np.zeros_like(x)
    k = 1 / N
    s[0] = x[0] * k
    for i in range(1, N + 1):
        s[i] = s[i - 1] + x[i] * k
    for i in range(N, x.shape[0]):
        s[i] = s[i - 1] + (x[i] - x[i - N]) * k
    s = s[N - 1:]
    return s

与之前建议的解决方案相比,它比scipy最快的解决方案“uniform_filter1d”快两倍,并且具有相同的错误顺序。 速度测试:

import numpy as np    
x = np.random.random(10000000)
N = 1000

from scipy.ndimage.filters import uniform_filter1d
%timeit uniform_filter1d(x, size=N)
95.7 ms ± 9.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit sma(x, N)
47.3 ms ± 3.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

错误的比较:

np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - uniform_filter1d(x, size=N, mode='constant', origin=-(N//2))[:-(N-1)]))
8.604228440844963e-14
np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - sma(x, N)))
1.41886502547095e-13

其他回答

上述所有的解决方案都很差,因为它们缺乏

由于本机python而不是numpy向量化实现, 数值稳定性,由于numpy使用不当。cumsum或 由于O(len(x) * w)实现为卷积的速度。

鉴于

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

注意x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(…)加上x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。结果是x[0:w].mean()

通过cumsum,您将通过添加x[w+1] / w并减去x[0] / w来更新第二个平均值,从而得到x[1:w+1].mean()。

这将一直进行,直到到达x[-w:].mean()。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

这个解是向量化的,O(m),可读且数值稳定。

Python标准库解决方案

这个生成器函数接受一个可迭代对象和一个窗口大小为N的值,并生成窗口内当前值的平均值。它使用了deque,这是一种类似于列表的数据结构,但针对在两端进行快速修改(弹出、追加)进行了优化。

from collections import deque
from itertools import islice

def sliding_avg(iterable, N):        
    it = iter(iterable)
    window = deque(islice(it, N))        
    num_vals = len(window)

    if num_vals < N:
        msg = 'window size {} exceeds total number of values {}'
        raise ValueError(msg.format(N, num_vals))

    N = float(N) # force floating point division if using Python 2
    s = sum(window)
    
    while True:
        yield s/N
        try:
            nxt = next(it)
        except StopIteration:
            break
        s = s - window.popleft() + nxt
        window.append(nxt)
        

下面是函数的运行情况:

>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>> 
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0

我知道这是一个老问题,但这里有一个解决方案,它不使用任何额外的数据结构或库。它在输入列表的元素数量上是线性的,我想不出任何其他方法来使它更有效(实际上,如果有人知道更好的分配结果的方法,请告诉我)。

注意:使用numpy数组而不是列表会快得多,但我想消除所有依赖关系。通过多线程执行也可以提高性能

该函数假设输入列表是一维的,所以要小心。

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

例子

假设我们有一个列表data =[1,2,3,4,5,6],我们想在它上面计算周期为3的滚动平均值,并且你还想要一个与输入列表相同大小的输出列表(这是最常见的情况)。

第一个元素的索引为0,因此滚动平均值应该在索引为-2、-1和0的元素上计算。显然,我们没有data[-2]和data[-1](除非您想使用特殊的边界条件),因此我们假设这些元素为0。这相当于对列表进行零填充,除了我们实际上不填充它,只是跟踪需要填充的索引(从0到N-1)。

所以,对于前N个元素,我们只是在累加器中不断地把元素加起来。

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

从元素N+1开始,简单的累加是行不通的。我们期望的结果是[3]=(2 + 3 + 4)/3 = 3,但这与(sum + 4)/3 = 3.333不同。

计算正确值的方法是用sum+4减去数据[0]= 1,从而得到sum+4 - 1 = 9。

这是因为目前sum =数据[0]+数据[1]+数据[2],但对于每个i >= N也是如此,因为在减法之前,sum是数据[i-N] +…+ data[i-2] + data[i-1]。

高效的解决方案

卷积比直接的方法好得多,但(我猜)它使用FFT,因此相当慢。但是,下面的方法特别适用于计算运行平均值

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

要检查的代码

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

注意numpy。allclose(result1, result2)为True,两个方法等价。 N越大,时间差异越大。

警告:虽然cumsum更快,但会增加浮点错误,这可能导致您的结果无效/不正确/不可接受

这里的评论指出了这个浮点错误问题,但我在回答中让它更明显。

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)

the more points you accumulate over the greater the floating point error (so 1e5 points is noticable, 1e6 points is more significant, more than 1e6 and you may want to resetting the accumulators) you can cheat by using np.longdouble but your floating point error still will get significant for relatively large number of points (around >1e5 but depends on your data) you can plot the error and see it increasing relatively fast the convolve solution is slower but does not have this floating point loss of precision the uniform_filter1d solution is faster than this cumsum solution AND does not have this floating point loss of precision

这个问题现在甚至比NeXuS上个月写的时候更古老,但我喜欢他的代码处理边缘情况的方式。然而,因为它是一个“简单移动平均”,它的结果滞后于它们应用的数据。我认为,通过对基于卷积()的方法应用类似的方法,可以以比NumPy的模式valid、same和full更令人满意的方式处理边缘情况。

我的贡献使用了一个中央运行平均值,以使其结果与他们的数据相一致。当可供使用的全尺寸窗口的点太少时,将从数组边缘的连续较小窗口计算运行平均值。[实际上,从连续较大的窗口,但这是一个实现细节。]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

它相对较慢,因为它使用了卷积(),并且可能会被真正的Pythonista修饰很多,但是,我相信这个想法是成立的。