Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?
当前回答
高效的解决方案
卷积比直接的方法好得多,但(我猜)它使用FFT,因此相当慢。但是,下面的方法特别适用于计算运行平均值
def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
要检查的代码
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
注意numpy。allclose(result1, result2)为True,两个方法等价。 N越大,时间差异越大。
警告:虽然cumsum更快,但会增加浮点错误,这可能导致您的结果无效/不正确/不可接受
这里的评论指出了这个浮点错误问题,但我在回答中让它更明显。
# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
the more points you accumulate over the greater the floating point error (so 1e5 points is noticable, 1e6 points is more significant, more than 1e6 and you may want to resetting the accumulators) you can cheat by using np.longdouble but your floating point error still will get significant for relatively large number of points (around >1e5 but depends on your data) you can plot the error and see it increasing relatively fast the convolve solution is slower but does not have this floating point loss of precision the uniform_filter1d solution is faster than this cumsum solution AND does not have this floating point loss of precision
其他回答
出于教学目的,让我再添加两个Numpy解决方案(比cumsum解决方案慢):
import numpy as np
from numpy.lib.stride_tricks import as_strided
def ra_strides(arr, window):
''' Running average using as_strided'''
n = arr.shape[0] - window + 1
arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
return arr_strided.mean(axis=1)
def ra_add(arr, window):
''' Running average using add.reduceat'''
n = arr.shape[0] - window + 1
indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
arr = np.append(arr, 0)
return np.add.reduceat(arr, indices )[::2]/window
使用的函数:as_strided, add.reduceat
这个问题现在甚至比NeXuS上个月写的时候更古老,但我喜欢他的代码处理边缘情况的方式。然而,因为它是一个“简单移动平均”,它的结果滞后于它们应用的数据。我认为,通过对基于卷积()的方法应用类似的方法,可以以比NumPy的模式valid、same和full更令人满意的方式处理边缘情况。
我的贡献使用了一个中央运行平均值,以使其结果与他们的数据相一致。当可供使用的全尺寸窗口的点太少时,将从数组边缘的连续较小窗口计算运行平均值。[实际上,从连续较大的窗口,但这是一个实现细节。]
import numpy as np
def running_mean(l, N):
# Also works for the(strictly invalid) cases when N is even.
if (N//2)*2 == N:
N = N - 1
front = np.zeros(N//2)
back = np.zeros(N//2)
for i in range(1, (N//2)*2, 2):
front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
for i in range(1, (N//2)*2, 2):
back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
它相对较慢,因为它使用了卷积(),并且可能会被真正的Pythonista修饰很多,但是,我相信这个想法是成立的。
上面的一个答案中有一个mab的注释,它有这个方法。瓶颈有move_mean,这是一个简单的移动平均:
import numpy as np
import bottleneck as bn
a = np.arange(10) + np.random.random(10)
mva = bn.move_mean(a, window=2, min_count=1)
Min_count是一个很方便的参数,它可以取数组中该点的移动平均值。如果你不设置min_count,它将等于window,并且直到window points的所有内容都将是nan。
如果你选择自己生成,而不是使用现有的库,请注意浮点错误并尽量减少其影响:
class SumAccumulator:
def __init__(self):
self.values = [0]
self.count = 0
def add( self, val ):
self.values.append( val )
self.count = self.count + 1
i = self.count
while i & 0x01:
i = i >> 1
v0 = self.values.pop()
v1 = self.values.pop()
self.values.append( v0 + v1 )
def get_total(self):
return sum( reversed(self.values) )
def get_size( self ):
return self.count
如果所有的值都是大致相同的数量级,那么这将通过始终添加大致相似的数量级值来帮助保持精度。
从其他答案来看,我不认为这是问题所要求的,但我需要保持一个不断增长的值列表的运行平均值。
因此,如果你想保持从某个地方(站点,测量设备等)获取的值的列表和最近n个值更新的平均值,你可以使用下面的代码,这将最大限度地减少添加新元素的工作:
class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size) # make sure it's an int
self.reset()
def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new) # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size: # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average: # ~ if isNaN().
self._average = new # no previous numbers, so it's new.
else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /= (n+1) # divide by new number of numbers.
else: # buffer full, replace oldest value.
old = self._buffer[self._index] # the previous oldest number.
self._buffer[self._index] = new # replace with new one.
self._index += 1 # update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...
# ... weighted by the number of elements.
def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average
def get(self):
"""
Return the moving average value.
"""
return self()
def reset(self):
"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer = [] # could use np.empty(self.buffer_size)...
self._index = 0 # and use this to keep track of how many numbers.
self._average = float('nan') # could use np.NaN .
def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size
def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!') # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
你可以测试它,例如:
def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()
这使:
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录