Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?
当前回答
上述所有的解决方案都很差,因为它们缺乏
由于本机python而不是numpy向量化实现, 数值稳定性,由于numpy使用不当。cumsum或 由于O(len(x) * w)实现为卷积的速度。
鉴于
import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000
注意x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(…)加上x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。结果是x[0:w].mean()
通过cumsum,您将通过添加x[w+1] / w并减去x[0] / w来更新第二个平均值,从而得到x[1:w+1].mean()。
这将一直进行,直到到达x[-w:].mean()。
x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w
这个解是向量化的,O(m),可读且数值稳定。
其他回答
从其他答案来看,我不认为这是问题所要求的,但我需要保持一个不断增长的值列表的运行平均值。
因此,如果你想保持从某个地方(站点,测量设备等)获取的值的列表和最近n个值更新的平均值,你可以使用下面的代码,这将最大限度地减少添加新元素的工作:
class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size) # make sure it's an int
self.reset()
def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new) # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size: # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average: # ~ if isNaN().
self._average = new # no previous numbers, so it's new.
else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /= (n+1) # divide by new number of numbers.
else: # buffer full, replace oldest value.
old = self._buffer[self._index] # the previous oldest number.
self._buffer[self._index] = new # replace with new one.
self._index += 1 # update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...
# ... weighted by the number of elements.
def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average
def get(self):
"""
Return the moving average value.
"""
return self()
def reset(self):
"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer = [] # could use np.empty(self.buffer_size)...
self._index = 0 # and use this to keep track of how many numbers.
self._average = float('nan') # could use np.NaN .
def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size
def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!') # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
你可以测试它,例如:
def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()
这使:
如果你必须为非常小的数组(少于200个元素)重复这样做,我发现只用线性代数就能得到最快的结果。 最慢的部分是建立你的乘法矩阵y,你只需要做一次,但之后可能会更快。
import numpy as np
import random
N = 100 # window size
size =200 # array length
x = np.random.random(size)
y = np.eye(size, dtype=float)
# prepare matrix
for i in range(size):
y[i,i:i+N] = 1./N
# calculate running mean
z = np.inner(x,y.T)[N-1:]
高效的解决方案
卷积比直接的方法好得多,但(我猜)它使用FFT,因此相当慢。但是,下面的方法特别适用于计算运行平均值
def running_mean(x, N):
cumsum = numpy.cumsum(numpy.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
要检查的代码
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
注意numpy。allclose(result1, result2)为True,两个方法等价。 N越大,时间差异越大。
警告:虽然cumsum更快,但会增加浮点错误,这可能导致您的结果无效/不正确/不可接受
这里的评论指出了这个浮点错误问题,但我在回答中让它更明显。
# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
the more points you accumulate over the greater the floating point error (so 1e5 points is noticable, 1e6 points is more significant, more than 1e6 and you may want to resetting the accumulators) you can cheat by using np.longdouble but your floating point error still will get significant for relatively large number of points (around >1e5 but depends on your data) you can plot the error and see it increasing relatively fast the convolve solution is slower but does not have this floating point loss of precision the uniform_filter1d solution is faster than this cumsum solution AND does not have this floating point loss of precision
Python标准库解决方案
这个生成器函数接受一个可迭代对象和一个窗口大小为N的值,并生成窗口内当前值的平均值。它使用了deque,这是一种类似于列表的数据结构,但针对在两端进行快速修改(弹出、追加)进行了优化。
from collections import deque
from itertools import islice
def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)
if num_vals < N:
msg = 'window size {} exceeds total number of values {}'
raise ValueError(msg.format(N, num_vals))
N = float(N) # force floating point division if using Python 2
s = sum(window)
while True:
yield s/N
try:
nxt = next(it)
except StopIteration:
break
s = s - window.popleft() + nxt
window.append(nxt)
下面是函数的运行情况:
>>> values = range(100)
>>> N = 5
>>> window_avg = sliding_avg(values, N)
>>>
>>> next(window_avg) # (0 + 1 + 2 + 3 + 4)/5
>>> 2.0
>>> next(window_avg) # (1 + 2 + 3 + 4 + 5)/5
>>> 3.0
>>> next(window_avg) # (2 + 3 + 4 + 5 + 6)/5
>>> 4.0
我知道这是一个老问题,但这里有一个解决方案,它不使用任何额外的数据结构或库。它在输入列表的元素数量上是线性的,我想不出任何其他方法来使它更有效(实际上,如果有人知道更好的分配结果的方法,请告诉我)。
注意:使用numpy数组而不是列表会快得多,但我想消除所有依赖关系。通过多线程执行也可以提高性能
该函数假设输入列表是一维的,所以要小心。
### Running mean/Moving average
def running_mean(l, N):
sum = 0
result = list( 0 for x in l)
for i in range( 0, N ):
sum = sum + l[i]
result[i] = sum / (i+1)
for i in range( N, len(l) ):
sum = sum - l[i-N] + l[i]
result[i] = sum / N
return result
例子
假设我们有一个列表data =[1,2,3,4,5,6],我们想在它上面计算周期为3的滚动平均值,并且你还想要一个与输入列表相同大小的输出列表(这是最常见的情况)。
第一个元素的索引为0,因此滚动平均值应该在索引为-2、-1和0的元素上计算。显然,我们没有data[-2]和data[-1](除非您想使用特殊的边界条件),因此我们假设这些元素为0。这相当于对列表进行零填充,除了我们实际上不填充它,只是跟踪需要填充的索引(从0到N-1)。
所以,对于前N个元素,我们只是在累加器中不断地把元素加起来。
result[0] = (0 + 0 + 1) / 3 = 0.333 == (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3 = 1 == (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3 = 2 == (sum + 3) / 3
从元素N+1开始,简单的累加是行不通的。我们期望的结果是[3]=(2 + 3 + 4)/3 = 3,但这与(sum + 4)/3 = 3.333不同。
计算正确值的方法是用sum+4减去数据[0]= 1,从而得到sum+4 - 1 = 9。
这是因为目前sum =数据[0]+数据[1]+数据[2],但对于每个i >= N也是如此,因为在减法之前,sum是数据[i-N] +…+ data[i-2] + data[i-1]。
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录