似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。

我的问题有两个方面:

用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?


当前回答

我觉得使用瓶颈可以很容易地解决这个问题

参见下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)

这就给出了每个轴上的移动平均值。

“mm”是“a”的移动平均值。 “窗口”是考虑移动均值的最大条目数。 "min_count"是考虑移动平均值的最小条目数(例如,对于第一个元素或如果数组有nan值)。

好在瓶颈有助于处理nan值,而且非常高效。

其他回答

从Numpy 1.20开始,sliding_window_view提供了一种在元素窗口中滑动/滚动的方法。然后你可以分别取平均值。

例如,对于一个4元素的窗口:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])

注意sliding_window_view的中间结果:

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5,  3,  8, 10],
#        [ 3,  8, 10,  2],
#        [ 8, 10,  2,  1],
#        [10,  2,  1,  5],
#        [ 2,  1,  5,  1],
#        [ 1,  5,  1,  0],
#        [ 5,  1,  0,  2]])

所有的答案似乎都集中在预先计算的列表的情况下。对于实际运行的用例,数字一个接一个地进来,这里有一个简单的类,它提供了对最后N个值求平均的服务:

import numpy as np
class RunningAverage():
    def __init__(self, stack_size):
        self.stack = [0 for _ in range(stack_size)]
        self.ptr = 0
        self.full_cycle = False
    def add(self,value):
        self.stack[self.ptr] = value
        self.ptr += 1
        if self.ptr == len(self.stack):
            self.full_cycle = True
            self.ptr = 0
    def get_avg(self):
        if self.full_cycle:
            return np.mean(self.stack)
        else:
            return np.mean(self.stack[:self.ptr])

用法:

N = 50  # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
    value = <my computation>
    run_avg.add(value)
    if i % 20 ==0: # print once in 20 iters:
        print(f'the average value is {run_avg.get_avg()}')

如果你已经有一个已知大小的数组

import numpy as np                                         
M=np.arange(12)
                                                               
avg=[]                                                         
i=0
while i<len(M)-2: #for n point average len(M) - (n-1)
        avg.append((M[i]+M[i+1]+M[i+2])/3) #n is denominator                       
        i+=1     
                                                                                                    
print(avg)

下面是一个使用numba的快速实现(注意类型)。注意它确实包含移位的nan。

import numpy as np
import numba as nb

@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
        fastmath=True,nopython=True)
def moving_average( array, window ):    
    ret = np.cumsum(array)
    ret[window:] = ret[window:] - ret[:-window]
    ma = ret[window - 1:] / window
    n = np.empty(window-1); n.fill(np.nan)
    return np.concatenate((n.ravel(), ma.ravel())) 

我觉得使用瓶颈可以很容易地解决这个问题

参见下面的基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)

这就给出了每个轴上的移动平均值。

“mm”是“a”的移动平均值。 “窗口”是考虑移动均值的最大条目数。 "min_count"是考虑移动平均值的最小条目数(例如,对于第一个元素或如果数组有nan值)。

好在瓶颈有助于处理nan值,而且非常高效。