Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的特征:

忽略nan值 计算N个相邻值的平均值,不包括兴趣值本身

这第二个特征对于确定哪些值与总体趋势有一定的差异特别有用。

我使用numpy。cumsum,因为这是最省时的方法(参见上面Alleo的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

这段代码只适用于偶数n。它可以通过改变np来调整奇数。插入padded_x和n_nan。

输出示例(黑色为raw,蓝色为movavg):

这段代码可以很容易地修改,以删除从小于cutoff = 3的非nan值计算的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

其他回答

高效的解决方案

卷积比直接的方法好得多,但(我猜)它使用FFT,因此相当慢。但是,下面的方法特别适用于计算运行平均值

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

要检查的代码

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

注意numpy。allclose(result1, result2)为True,两个方法等价。 N越大,时间差异越大。

警告:虽然cumsum更快,但会增加浮点错误,这可能导致您的结果无效/不正确/不可接受

这里的评论指出了这个浮点错误问题,但我在回答中让它更明显。

# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)

the more points you accumulate over the greater the floating point error (so 1e5 points is noticable, 1e6 points is more significant, more than 1e6 and you may want to resetting the accumulators) you can cheat by using np.longdouble but your floating point error still will get significant for relatively large number of points (around >1e5 but depends on your data) you can plot the error and see it increasing relatively fast the convolve solution is slower but does not have this floating point loss of precision the uniform_filter1d solution is faster than this cumsum solution AND does not have this floating point loss of precision

更新:已经提出了更有效的解决方案,scipy的uniform_filter1d可能是“标准”第三方库中最好的,还有一些更新的或专门的库可用。


你可以用np。卷积得到:

np.convolve(x, np.ones(N)/N, mode='valid')

解释

The running mean is a case of the mathematical operation of convolution. For the running mean, you slide a window along the input and compute the mean of the window's contents. For discrete 1D signals, convolution is the same thing, except instead of the mean you compute an arbitrary linear combination, i.e., multiply each element by a corresponding coefficient and add up the results. Those coefficients, one for each position in the window, are sometimes called the convolution kernel. The arithmetic mean of N values is (x_1 + x_2 + ... + x_N) / N, so the corresponding kernel is (1/N, 1/N, ..., 1/N), and that's exactly what we get by using np.ones(N)/N.

边缘

np的模态参数。Convolve指定如何处理边缘。我在这里选择有效模式,因为我认为这是大多数人期望的运行方式,但您可能有其他优先级。下面是一个图表,说明了模式之间的差异:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

仅使用Python标准库(内存高效)

只提供标准库deque的另一个版本。令我惊讶的是,大多数答案都使用pandas或numpy。

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

实际上,我在python文档中找到了另一个实现

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

然而,在我看来,实现似乎比它应该的要复杂一些。但它肯定在标准python文档中是有原因的,有人能评论一下我的实现和标准文档吗?

上面有很多关于计算运行平均值的答案。我的回答增加了两个额外的特征:

忽略nan值 计算N个相邻值的平均值,不包括兴趣值本身

这第二个特征对于确定哪些值与总体趋势有一定的差异特别有用。

我使用numpy。cumsum,因为这是最省时的方法(参见上面Alleo的回答)。

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

这段代码只适用于偶数n。它可以通过改变np来调整奇数。插入padded_x和n_nan。

输出示例(黑色为raw,蓝色为movavg):

这段代码可以很容易地修改,以删除从小于cutoff = 3的非nan值计算的所有移动平均值。

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

比起numpy或scipy,我建议熊猫们更快地做到这一点:

df['data'].rolling(3).mean()

这取列“数据”的3个周期的移动平均值(MA)。你也可以计算移位的版本,例如排除当前单元格的版本(向后移位一个)可以很容易地计算为:

df['data'].shift(periods=1).rolling(3).mean()