Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

更新:已经提出了更有效的解决方案,scipy的uniform_filter1d可能是“标准”第三方库中最好的,还有一些更新的或专门的库可用。


你可以用np。卷积得到:

np.convolve(x, np.ones(N)/N, mode='valid')

解释

The running mean is a case of the mathematical operation of convolution. For the running mean, you slide a window along the input and compute the mean of the window's contents. For discrete 1D signals, convolution is the same thing, except instead of the mean you compute an arbitrary linear combination, i.e., multiply each element by a corresponding coefficient and add up the results. Those coefficients, one for each position in the window, are sometimes called the convolution kernel. The arithmetic mean of N values is (x_1 + x_2 + ... + x_N) / N, so the corresponding kernel is (1/N, 1/N, ..., 1/N), and that's exactly what we get by using np.ones(N)/N.

边缘

np的模态参数。Convolve指定如何处理边缘。我在这里选择有效模式,因为我认为这是大多数人期望的运行方式,但您可能有其他优先级。下面是一个图表,说明了模式之间的差异:

import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
    plt.plot(np.convolve(np.ones(200), np.ones(50)/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()

其他回答

仅使用Python标准库(内存高效)

只提供标准库deque的另一个版本。令我惊讶的是,大多数答案都使用pandas或numpy。

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

实际上,我在python文档中找到了另一个实现

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

然而,在我看来,实现似乎比它应该的要复杂一些。但它肯定在标准python文档中是有原因的,有人能评论一下我的实现和标准文档吗?

你可以使用scipy. nmage .uniform_filter1d:

import numpy as np
from scipy.ndimage import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d:

给出具有相同numpy形状的输出(即点数) 允许多种方式处理边界,其中'reflect'是默认的,但在我的情况下,我更想要'nearest'

它也相当快(比np快近50倍)。卷积,比上述cumsum方法快2-5倍):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

这里有3个函数可以让你比较不同实现的错误/速度:

from __future__ import division
import numpy as np
import scipy.ndimage as ndi
def running_mean_convolve(x, N):
    return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
    return ndi.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]

另一个解决方案是使用标准库和deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

这个问题现在甚至比NeXuS上个月写的时候更古老,但我喜欢他的代码处理边缘情况的方式。然而,因为它是一个“简单移动平均”,它的结果滞后于它们应用的数据。我认为,通过对基于卷积()的方法应用类似的方法,可以以比NumPy的模式valid、same和full更令人满意的方式处理边缘情况。

我的贡献使用了一个中央运行平均值,以使其结果与他们的数据相一致。当可供使用的全尺寸窗口的点太少时,将从数组边缘的连续较小窗口计算运行平均值。[实际上,从连续较大的窗口,但这是一个实现细节。]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

它相对较慢,因为它使用了卷积(),并且可能会被真正的Pythonista修饰很多,但是,我相信这个想法是成立的。

如果你必须为非常小的数组(少于200个元素)重复这样做,我发现只用线性代数就能得到最快的结果。 最慢的部分是建立你的乘法矩阵y,你只需要做一次,但之后可能会更快。

import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]