似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。
我的问题有两个方面:
用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?
似乎没有函数可以简单地计算numpy/scipy的移动平均值,这导致了复杂的解决方案。
我的问题有两个方面:
用numpy(正确地)实现移动平均的最简单方法是什么? 既然这似乎不是小事,而且容易出错,有没有一个很好的理由不包括电池在这种情况下?
当前回答
Talib包含一个简单的移动平均工具,以及其他类似的平均工具(即指数移动平均)。下面将该方法与其他一些解决方案进行比较。
%timeit pd.Series(np.arange(100000)).rolling(3).mean()
2.53 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit talib.SMA(real = np.arange(100000.), timeperiod = 3)
348 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit moving_average(np.arange(100000))
638 µs ± 45.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
需要注意的是,real必须有dtype = float的元素。否则将引发以下错误
例外:实不是双的
其他回答
如果你想仔细考虑边缘条件(只从边缘的可用元素计算平均值),下面的函数可以解决这个问题。
import numpy as np
def running_mean(x, N):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]
for i in range(dim_len):
if N%2 == 0:
a, b = i - (N-1)//2, i + (N-1)//2 + 2
else:
a, b = i - (N-1)//2, i + (N-1)//2 + 1
#cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i] = np.mean(x[a:b])
return out
>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])
>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])
NumPy缺乏特定领域的函数可能是由于核心团队的纪律和对NumPy主要指令的忠实:提供n维数组类型,以及用于创建和索引这些数组的函数。像许多基本目标一样,这个目标并不小,NumPy出色地完成了它。
更大的SciPy包含更大的特定于领域的库集合(被SciPy开发人员称为子包)——例如,数值优化(optimize)、信号处理(signal)和积分(integrate)。
我的猜测是,您要查找的函数至少在SciPy子包中的一个(SciPy。也许信号);然而,我将首先在SciPy scikit集合中查找,确定相关的scikit并在其中寻找感兴趣的函数。
Scikits是基于NumPy/SciPy独立开发的包,并针对特定的技术规程(例如,Scikits -image, Scikits -learn等),其中几个(特别是用于数值优化的令人钦佩的OpenOpt)在选择位于相对较新的Scikits主题之下很久以前就得到了高度重视,成熟的项目。Scikits主页上列出了大约30个这样的Scikits,尽管其中至少有几个已经不再处于积极的开发中。
按照这个建议,你会发现scikits-timeseries;但是,该软件包已不再处于积极开发阶段;实际上,Pandas已经成为AFAIK,事实上的基于numpy的时间序列库。
Pandas有几个函数可以用来计算移动平均线;其中最简单的可能是rolling_mean,你可以这样使用:
>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP
>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')
>>> # the data:
>>> x = NP.arange(0, t.shape[0])
>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)
现在,只需调用函数rolling_mean,传入Series对象和窗口大小,在下面的例子中是10天。
>>> d_mva = PD.rolling_mean(D, 10)
>>> # d_mva is the same size as the original Series
>>> d_mva.shape
(1096,)
>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
验证它是否有效。,将原系列中的值10 - 15与用滚动平均值平滑的新系列进行比较
>>> D[10:15]
2010-01-11 2.041076
2010-01-12 2.041076
2010-01-13 2.720585
2010-01-14 2.720585
2010-01-15 3.656987
Freq: D
>>> d_mva[10:20]
2010-01-11 3.131125
2010-01-12 3.035232
2010-01-13 2.923144
2010-01-14 2.811055
2010-01-15 2.785824
Freq: D
The function rolling_mean, along with about a dozen or so other function are informally grouped in the Pandas documentation under the rubric moving window functions; a second, related group of functions in Pandas is referred to as exponentially-weighted functions (e.g., ewma, which calculates exponentially moving weighted average). The fact that this second group is not included in the first (moving window functions) is perhaps because the exponentially-weighted transforms don't rely on a fixed-length window
for i in range(len(Data)):
Data[i, 1] = Data[i-lookback:i, 0].sum() / lookback
试试这段代码。我认为这样更简单,也能达到目的。 回望是移动平均线的窗口。
在Data[i-lookback:i, 0].sum()中,我放了0来指代数据集的第一列,但如果你有多个列,你可以放任何你喜欢的列。
下面是一个使用numba的快速实现(注意类型)。注意它确实包含移位的nan。
import numpy as np
import numba as nb
@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
fastmath=True,nopython=True)
def moving_average( array, window ):
ret = np.cumsum(array)
ret[window:] = ret[window:] - ret[:-window]
ma = ret[window - 1:] / window
n = np.empty(window-1); n.fill(np.nan)
return np.concatenate((n.ravel(), ma.ravel()))
所有的答案似乎都集中在预先计算的列表的情况下。对于实际运行的用例,数字一个接一个地进来,这里有一个简单的类,它提供了对最后N个值求平均的服务:
import numpy as np
class RunningAverage():
def __init__(self, stack_size):
self.stack = [0 for _ in range(stack_size)]
self.ptr = 0
self.full_cycle = False
def add(self,value):
self.stack[self.ptr] = value
self.ptr += 1
if self.ptr == len(self.stack):
self.full_cycle = True
self.ptr = 0
def get_avg(self):
if self.full_cycle:
return np.mean(self.stack)
else:
return np.mean(self.stack[:self.ptr])
用法:
N = 50 # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
value = <my computation>
run_avg.add(value)
if i % 20 ==0: # print once in 20 iters:
print(f'the average value is {run_avg.get_avg()}')