Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

我还没有检查这有多快,但你可以试试:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)

其他回答

仅使用Python标准库(内存高效)

只提供标准库deque的另一个版本。令我惊讶的是,大多数答案都使用pandas或numpy。

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

实际上,我在python文档中找到了另一个实现

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

然而,在我看来,实现似乎比它应该的要复杂一些。但它肯定在标准python文档中是有原因的,有人能评论一下我的实现和标准文档吗?

这个问题现在甚至比NeXuS上个月写的时候更古老,但我喜欢他的代码处理边缘情况的方式。然而,因为它是一个“简单移动平均”,它的结果滞后于它们应用的数据。我认为,通过对基于卷积()的方法应用类似的方法,可以以比NumPy的模式valid、same和full更令人满意的方式处理边缘情况。

我的贡献使用了一个中央运行平均值,以使其结果与他们的数据相一致。当可供使用的全尺寸窗口的点太少时,将从数组边缘的连续较小窗口计算运行平均值。[实际上,从连续较大的窗口,但这是一个实现细节。]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

它相对较慢,因为它使用了卷积(),并且可能会被真正的Pythonista修饰很多,但是,我相信这个想法是成立的。

更新:下面的例子展示了老熊猫。Rolling_mean函数,该函数在最近版本的pandas中已被删除。该函数调用的现代等价函数将使用pandas.Series.rolling:

In [8]: pd.Series(x).rolling(window=N).mean().iloc[N-1:].values
Out[8]: 
array([ 0.49815397,  0.49844183,  0.49840518, ...,  0.49488191,
        0.49456679,  0.49427121])

pandas比NumPy或SciPy更适合这一点。它的函数rolling_mean很方便地完成了这项工作。当输入是一个数组时,它还返回一个NumPy数组。

使用任何定制的纯Python实现都很难在性能上击败rolling_mean。下面是针对两个提议的解决方案的性能示例:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

关于如何处理边缘值,也有很好的选项。

从其他答案来看,我不认为这是问题所要求的,但我需要保持一个不断增长的值列表的运行平均值。

因此,如果你想保持从某个地方(站点,测量设备等)获取的值的列表和最近n个值更新的平均值,你可以使用下面的代码,这将最大限度地减少添加新元素的工作:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

你可以测试它,例如:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

这使:

你可以使用scipy. nmage .uniform_filter1d:

import numpy as np
from scipy.ndimage import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

uniform_filter1d:

给出具有相同numpy形状的输出(即点数) 允许多种方式处理边界,其中'reflect'是默认的,但在我的情况下,我更想要'nearest'

它也相当快(比np快近50倍)。卷积,比上述cumsum方法快2-5倍):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

这里有3个函数可以让你比较不同实现的错误/速度:

from __future__ import division
import numpy as np
import scipy.ndimage as ndi
def running_mean_convolve(x, N):
    return np.convolve(x, np.ones(N) / float(N), 'valid')
def running_mean_cumsum(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0))
    return (cumsum[N:] - cumsum[:-N]) / float(N)
def running_mean_uniform_filter1d(x, N):
    return ndi.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]