似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。
我的问题有两个方面:
用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?
似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。
我的问题有两个方面:
用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?
当前回答
我要么使用公认答案的解决方案,稍微修改以使输出和输入的长度相同,要么使用另一个答案的评论中提到的熊猫版本。我在这里用一个可重复的例子来总结两者,以供将来参考:
import numpy as np
import pandas as pd
def moving_average(a, n):
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret / n
def moving_average_centered(a, n):
return pd.Series(a).rolling(window=n, center=True).mean().to_numpy()
A = [0, 0, 1, 2, 4, 5, 4]
print(moving_average(A, 3))
# [0. 0. 0.33333333 1. 2.33333333 3.66666667 4.33333333]
print(moving_average_centered(A, 3))
# [nan 0.33333333 1. 2.33333333 3.66666667 4.33333333 nan ]
其他回答
实现这一点的一个简单方法是使用np.卷积。 这背后的思想是利用离散卷积的计算方式,并使用它来返回滚动平均值。这可以通过与np序列进行卷积来实现。长度等于我们想要的滑动窗口长度。
为了做到这一点,我们可以定义以下函数:
def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
该函数将对序列x和长度为w的序列进行卷积。注意,所选模式是有效的,因此卷积积只对序列完全重叠的点给出。
一些例子:
x = np.array([5,3,8,10,2,1,5,1,0,2])
对于窗口长度为2的移动平均线,我们有:
moving_average(x, 2)
# array([4. , 5.5, 9. , 6. , 1.5, 3. , 3. , 0.5, 1. ])
对于长度为4的窗口:
moving_average(x, 4)
# array([6.5 , 5.75, 5.25, 4.5 , 2.25, 1.75, 2. ])
卷积是怎么工作的?
让我们更深入地看看离散卷积是如何计算的。 下面的函数旨在复制np。卷积计算输出值:
def mov_avg(x, w):
for m in range(len(x)-(w-1)):
yield sum(np.ones(w) * x[m:m+w]) / w
对于上面的同一个例子,也会得到:
list(mov_avg(x, 2))
# [4.0, 5.5, 9.0, 6.0, 1.5, 3.0, 3.0, 0.5, 1.0]
所以每一步要做的就是求1数组和当前窗口之间的内积。在这种情况下,乘以np.ones(w)是多余的,因为我们直接取序列的和。
下面是一个计算第一个输出的例子,这样会更清楚一些。假设我们想要一个w=4的窗口:
[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*5 + 1*3 + 1*8 + 1*10) / w = 6.5
下面的输出将被计算为:
[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*3 + 1*8 + 1*10 + 1*2) / w = 5.75
依此类推,在所有重叠完成后返回序列的移动平均值。
如果你想仔细考虑边缘条件(只从边缘的可用元素计算平均值),下面的函数可以解决这个问题。
import numpy as np
def running_mean(x, N):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]
for i in range(dim_len):
if N%2 == 0:
a, b = i - (N-1)//2, i + (N-1)//2 + 2
else:
a, b = i - (N-1)//2, i + (N-1)//2 + 1
#cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i] = np.mean(x[a:b])
return out
>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])
>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])
如果你已经有一个已知大小的数组
import numpy as np
M=np.arange(12)
avg=[]
i=0
while i<len(M)-2: #for n point average len(M) - (n-1)
avg.append((M[i]+M[i+1]+M[i+2])/3) #n is denominator
i+=1
print(avg)
如果你只想要一个简单的非加权移动平均,你可以很容易地用np实现它。cumsum,可能比基于FFT的方法更快:
修正了Bean在代码中发现的偏离一的错误索引。编辑
def moving_average(a, n=3) :
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
>>> a = np.arange(20)
>>> moving_average(a)
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11.,
12., 13., 14., 15., 16., 17., 18.])
>>> moving_average(a, n=4)
array([ 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5,
10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5])
所以我猜答案是:它真的很容易实现,也许numpy已经有了一些专门的功能。
您也可以编写自己的Python C扩展。
这当然不是最简单的方法,但与使用np相比,这将使您运行得更快,内存效率更高。堆积:作为建筑块的堆积
// moving_average.c
#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
#include <Python.h>
#include <numpy/arrayobject.h>
static PyObject *moving_average(PyObject *self, PyObject *args) {
PyObject *input;
int64_t window_size;
PyArg_ParseTuple(args, "Ol", &input, &window_size);
if (PyErr_Occurred()) return NULL;
if (!PyArray_Check(input) || !PyArray_ISNUMBER((PyArrayObject *)input)) {
PyErr_SetString(PyExc_TypeError, "First argument must be a numpy array with numeric dtype");
return NULL;
}
int64_t input_size = PyObject_Size(input);
double *input_data;
if (PyArray_AsCArray(&input, &input_data, (npy_intp[]){ [0] = input_size }, 1, PyArray_DescrFromType(NPY_DOUBLE)) != 0) {
PyErr_SetString(PyExc_TypeError, "Failed to simulate C array of type double");
return NULL;
}
int64_t output_size = input_size - window_size + 1;
PyObject *output = PyArray_SimpleNew(1, (npy_intp[]){ [0] = output_size }, NPY_DOUBLE);
double *output_data = PyArray_DATA((PyArrayObject *)output);
double cumsum_before = 0;
double cumsum_after = 0;
for (int i = 0; i < window_size; ++i) {
cumsum_after += input_data[i];
}
for (int i = 0; i < output_size - 1; ++i) {
output_data[i] = (cumsum_after - cumsum_before) / window_size;
cumsum_after += input_data[i + window_size];
cumsum_before += input_data[i];
}
output_data[output_size - 1] = (cumsum_after - cumsum_before) / window_size;
return output;
}
static PyMethodDef methods[] = {
{
"moving_average",
moving_average,
METH_VARARGS,
"Rolling mean of numpy array with specified window size"
},
{NULL, NULL, 0, NULL}
};
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"moving_average",
"C extension for finding the rolling mean of a numpy array",
-1,
methods
};
PyMODINIT_FUNC PyInit_moving_average(void) {
PyObject *module = PyModule_Create(&moduledef);
import_array();
return module;
}
METH_VARARGS specifies that the method only takes positional arguments. PyArg_ParseTuple allows you to parse these positional arguments. By using PyErr_SetString and returning NULL from the method, you can signal that an exception has occurred to the Python interpreter from the C extension. PyArray_AsCArray allows your method to be polymorphic when it comes to input array dtype, alignment, whether the array is C-contiguous (See "Can a numpy 1d array not be contiguous?") etc. without needing to create a copy of the array. If you instead used PyArray_DATA, you'd need to deal with this yourself. PyArray_SimpleNew allows you to create a new numpy array. This is similar to using np.empty. The array will not be initialized, and might contain non-deterministic junk which could surprise you if you forget to overwrite it.
构建C扩展
# setup.py
from setuptools import setup, Extension
import numpy
setup(
ext_modules=[
Extension(
'moving_average',
['moving_average.c'],
include_dirs=[numpy.get_include()]
)
]
)
# python setup.py build_ext --build-lib=.
基准
import numpy as np
# Our compiled C extension:
from moving_average import moving_average as moving_average_c
# Answer by Jaime using npcumsum
def moving_average_cumsum(a, n) :
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
# Answer by yatu using np.convolve
def moving_average_convolve(a, n):
return np.convolve(a, np.ones(n), 'valid') / n
a = np.random.rand(1_000_000)
print('window_size = 3')
%timeit moving_average_c(a, 3)
%timeit moving_average_cumsum(a, 3)
%timeit moving_average_convolve(a, 3)
print('\nwindow_size = 100')
%timeit moving_average_c(a, 100)
%timeit moving_average_cumsum(a, 100)
%timeit moving_average_convolve(a, 100)
window_size = 3
958 µs ± 4.68 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
4.52 ms ± 15.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
809 µs ± 463 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
window_size = 100
977 µs ± 937 ns per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
6.16 ms ± 19.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
14.2 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)