似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。
我的问题有两个方面:
用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?
似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。
我的问题有两个方面:
用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?
当前回答
这里有许多实现这一点的方法,以及一些基准测试。最好的方法是使用来自其他库的优化代码。瓶颈。Move_mean方法可能是最好的方法。scipy。卷积方法也非常快,可扩展,并且语法和概念简单,但是对于非常大的窗口值不能很好地扩展。numpy。如果你需要一个纯numpy方法,Cumsum方法是很好的。
注意:其中一些(例如:瓶颈。move_mean)不是居中的,并且会转移你的数据。
import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time
def rollavg_direct(a,n):
'Direct "for" loop'
assert n%2==1
b = a*0.0
for i in range(len(a)) :
b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
return b
def rollavg_comprehension(a,n):
'List comprehension'
assert n%2==1
r,N = int(n/2),len(a)
return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)])
def rollavg_convolve(a,n):
'scipy.convolve'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]
def rollavg_convolve_edges(a,n):
'scipy.convolve, edge handling'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')
def rollavg_cumsum(a,n):
'numpy.cumsum'
assert n%2==1
cumsum_vec = np.cumsum(np.insert(a, 0, 0))
return (cumsum_vec[n:] - cumsum_vec[:-n]) / n
def rollavg_cumsum_edges(a,n):
'numpy.cumsum, edge handling'
assert n%2==1
N = len(a)
cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0))
d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))
return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d
def rollavg_roll(a,n):
'Numpy array rolling'
assert n%2==1
N = len(a)
rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
return a[rolling_idx].mean(axis=0)[n-1:]
def rollavg_roll_edges(a,n):
# see https://stackoverflow.com/questions/42101082/fast-numpy-roll
'Numpy array rolling, edge handling'
assert n%2==1
a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
m = a.shape[1]
idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
out = a[np.arange(-n//2,n//2)[:,None], idx]
d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
return (out.sum(axis=0)/d)[n//2:]
def rollavg_pandas(a,n):
'Pandas rolling average'
return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()
def rollavg_bottlneck(a,n):
'bottleneck.move_mean'
return bn.move_mean(a, window=n, min_count=1)
N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve,
rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges,
rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]
print('Small window (n=3)')
%load_ext memory_profiler
for f in functions :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[0:-2] :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,1001)
print('\nMemory\n')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[2:-2] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,1001)
定时,小窗口(n=3)
Direct "for" loop :
4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
scipy.convolve, edge handling :
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum :
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Numpy array rolling :
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Numpy array rolling, edge handling :
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
定时,大窗口(n=1001)
Direct "for" loop :
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
scipy.convolve, edge handling :
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
numpy.cumsum :
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
内存,小窗口(n=3)
The memory_profiler extension is already loaded. To reload it, use:
%reload_ext memory_profiler
scipy.convolve :
peak memory: 362.66 MiB, increment: 73.61 MiB
scipy.convolve, edge handling :
peak memory: 510.24 MiB, increment: 221.19 MiB
numpy.cumsum :
peak memory: 441.81 MiB, increment: 152.76 MiB
numpy.cumsum, edge handling :
peak memory: 518.14 MiB, increment: 228.84 MiB
Pandas rolling average :
peak memory: 449.34 MiB, increment: 160.02 MiB
bottleneck.move_mean :
peak memory: 374.17 MiB, increment: 75.54 MiB
Numpy array rolling :
peak memory: 661.29 MiB, increment: 362.65 MiB
Numpy array rolling, edge handling :
peak memory: 1111.25 MiB, increment: 812.61 MiB
内存,大窗口(n=1001)
scipy.convolve :
peak memory: 370.62 MiB, increment: 71.83 MiB
scipy.convolve, edge handling :
peak memory: 521.98 MiB, increment: 223.18 MiB
numpy.cumsum :
peak memory: 451.32 MiB, increment: 152.52 MiB
numpy.cumsum, edge handling :
peak memory: 527.51 MiB, increment: 228.71 MiB
Pandas rolling average :
peak memory: 451.25 MiB, increment: 152.50 MiB
bottleneck.move_mean :
peak memory: 374.64 MiB, increment: 75.85 MiB
其他回答
所有的答案似乎都集中在预先计算的列表的情况下。对于实际运行的用例,数字一个接一个地进来,这里有一个简单的类,它提供了对最后N个值求平均的服务:
import numpy as np
class RunningAverage():
def __init__(self, stack_size):
self.stack = [0 for _ in range(stack_size)]
self.ptr = 0
self.full_cycle = False
def add(self,value):
self.stack[self.ptr] = value
self.ptr += 1
if self.ptr == len(self.stack):
self.full_cycle = True
self.ptr = 0
def get_avg(self):
if self.full_cycle:
return np.mean(self.stack)
else:
return np.mean(self.stack[:self.ptr])
用法:
N = 50 # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
value = <my computation>
run_avg.add(value)
if i % 20 ==0: # print once in 20 iters:
print(f'the average value is {run_avg.get_avg()}')
从Numpy 1.20开始,sliding_window_view提供了一种在元素窗口中滑动/滚动的方法。然后你可以分别取平均值。
例如,对于一个4元素的窗口:
from numpy.lib.stride_tricks import sliding_window_view
# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])
注意sliding_window_view的中间结果:
# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5, 3, 8, 10],
# [ 3, 8, 10, 2],
# [ 8, 10, 2, 1],
# [10, 2, 1, 5],
# [ 2, 1, 5, 1],
# [ 1, 5, 1, 0],
# [ 5, 1, 0, 2]])
下面是一个使用numba的快速实现(注意类型)。注意它确实包含移位的nan。
import numpy as np
import numba as nb
@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
fastmath=True,nopython=True)
def moving_average( array, window ):
ret = np.cumsum(array)
ret[window:] = ret[window:] - ret[:-window]
ma = ret[window - 1:] / window
n = np.empty(window-1); n.fill(np.nan)
return np.concatenate((n.ravel(), ma.ravel()))
如果你已经有一个已知大小的数组
import numpy as np
M=np.arange(12)
avg=[]
i=0
while i<len(M)-2: #for n point average len(M) - (n-1)
avg.append((M[i]+M[i+1]+M[i+2])/3) #n is denominator
i+=1
print(avg)
您也可以编写自己的Python C扩展。
这当然不是最简单的方法,但与使用np相比,这将使您运行得更快,内存效率更高。堆积:作为建筑块的堆积
// moving_average.c
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <Python.h>
#include <numpy/arrayobject.h>
static PyObject *moving_average(PyObject *self, PyObject *args) {
PyObject *input;
int64_t window_size;
PyArg_ParseTuple(args, "Ol", &input, &window_size);
if (PyErr_Occurred()) return NULL;
if (!PyArray_Check(input) || !PyArray_ISNUMBER((PyArrayObject *)input)) {
PyErr_SetString(PyExc_TypeError, "First argument must be a numpy array with numeric dtype");
return NULL;
}
int64_t input_size = PyObject_Size(input);
double *input_data;
if (PyArray_AsCArray(&input, &input_data, (npy_intp[]){ [0] = input_size }, 1, PyArray_DescrFromType(NPY_DOUBLE)) != 0) {
PyErr_SetString(PyExc_TypeError, "Failed to simulate C array of type double");
return NULL;
}
int64_t output_size = input_size - window_size + 1;
PyObject *output = PyArray_SimpleNew(1, (npy_intp[]){ [0] = output_size }, NPY_DOUBLE);
double *output_data = PyArray_DATA((PyArrayObject *)output);
double cumsum_before = 0;
double cumsum_after = 0;
for (int i = 0; i < window_size; ++i) {
cumsum_after += input_data[i];
}
for (int i = 0; i < output_size - 1; ++i) {
output_data[i] = (cumsum_after - cumsum_before) / window_size;
cumsum_after += input_data[i + window_size];
cumsum_before += input_data[i];
}
output_data[output_size - 1] = (cumsum_after - cumsum_before) / window_size;
return output;
}
static PyMethodDef methods[] = {
{
"moving_average",
moving_average,
METH_VARARGS,
"Rolling mean of numpy array with specified window size"
},
{NULL, NULL, 0, NULL}
};
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"moving_average",
"C extension for finding the rolling mean of a numpy array",
-1,
methods
};
PyMODINIT_FUNC PyInit_moving_average(void) {
PyObject *module = PyModule_Create(&moduledef);
import_array();
return module;
}
METH_VARARGS specifies that the method only takes positional arguments. PyArg_ParseTuple allows you to parse these positional arguments. By using PyErr_SetString and returning NULL from the method, you can signal that an exception has occurred to the Python interpreter from the C extension. PyArray_AsCArray allows your method to be polymorphic when it comes to input array dtype, alignment, whether the array is C-contiguous (See "Can a numpy 1d array not be contiguous?") etc. without needing to create a copy of the array. If you instead used PyArray_DATA, you'd need to deal with this yourself. PyArray_SimpleNew allows you to create a new numpy array. This is similar to using np.empty. The array will not be initialized, and might contain non-deterministic junk which could surprise you if you forget to overwrite it.
构建C扩展
# setup.py
from setuptools import setup, Extension
import numpy
setup(
ext_modules=[
Extension(
'moving_average',
['moving_average.c'],
include_dirs=[numpy.get_include()]
)
]
)
# python setup.py build_ext --build-lib=.
基准
import numpy as np
# Our compiled C extension:
from moving_average import moving_average as moving_average_c
# Answer by Jaime using npcumsum
def moving_average_cumsum(a, n) :
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
# Answer by yatu using np.convolve
def moving_average_convolve(a, n):
return np.convolve(a, np.ones(n), 'valid') / n
a = np.random.rand(1_000_000)
print('window_size = 3')
%timeit moving_average_c(a, 3)
%timeit moving_average_cumsum(a, 3)
%timeit moving_average_convolve(a, 3)
print('\nwindow_size = 100')
%timeit moving_average_c(a, 100)
%timeit moving_average_cumsum(a, 100)
%timeit moving_average_convolve(a, 100)
window_size = 3
958 µs ± 4.68 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
4.52 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
809 µs ± 463 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
window_size = 100
977 µs ± 937 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
6.16 ms ± 19.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
14.2 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)