Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?
当前回答
一个新的卷积配方被合并到Python 3.10中。
鉴于
import collections, operator
from itertools import chain, repeat
size = 3 + 1
kernel = [1/size] * size
Code
def convolve(signal, kernel):
# See: https://betterexplained.com/articles/intuitive-convolution/
# convolve(data, [0.25, 0.25, 0.25, 0.25]) --> Moving average (blur)
# convolve(data, [1, -1]) --> 1st finite difference (1st derivative)
# convolve(data, [1, -2, 1]) --> 2nd finite difference (2nd derivative)
kernel = list(reversed(kernel))
n = len(kernel)
window = collections.deque([0] * n, maxlen=n)
for x in chain(signal, repeat(0, n-1)):
window.append(x)
yield sum(map(operator.mul, kernel, window))
Demo
list(convolve(range(1, 6), kernel))
# [0.25, 0.75, 1.5, 2.5, 3.5, 3.0, 2.25, 1.25]
细节
卷积是一种可以应用于移动平均的一般数学运算。其思想是,给定一些数据,您将数据子集(窗口)作为“掩码”或“内核”在数据中滑动,在每个窗口上执行特定的数学操作。在移动平均的情况下,核是平均值:
现在可以通过more_itertools.convolve使用这个实现。 More_itertools是一个流行的第三方包;通过> PIP Install more_itertools安装。
其他回答
对于一个简短、快速的解决方案,在一个循环中完成所有事情,没有依赖关系,下面的代码工作得很好。
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []
for i, x in enumerate(mylist, 1):
cumsum.append(cumsum[i-1] + x)
if i>=N:
moving_ave = (cumsum[i] - cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)
另一种不使用numpy或pandas找到移动平均线的方法
import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
lambda a,b: b/a,
enumerate(itertools.accumulate(sample), 1))
)
将打印[2.0,4.0,6.0,6.5,7.4,7.83333333333333333]
2.0 = (2)/1 4.0 is (2 + 6) / 2 6.0 = (2 + 6 + 10) / 3 .
比起numpy或scipy,我建议熊猫们更快地做到这一点:
df['data'].rolling(3).mean()
这取列“数据”的3个周期的移动平均值(MA)。你也可以计算移位的版本,例如排除当前单元格的版本(向后移位一个)可以很容易地计算为:
df['data'].shift(periods=1).rolling(3).mean()
我觉得使用瓶颈可以很好地解决这个问题
参见下面的基本示例:
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
“mm”是“a”的移动平均值。 “窗口”是考虑移动均值的最大条目数。 "min_count"是考虑移动平均值的最小条目数(例如,对于前几个元素或如果数组有nan值)。
好在瓶颈有助于处理nan值,而且非常高效。
从其他答案来看,我不认为这是问题所要求的,但我需要保持一个不断增长的值列表的运行平均值。
因此,如果你想保持从某个地方(站点,测量设备等)获取的值的列表和最近n个值更新的平均值,你可以使用下面的代码,这将最大限度地减少添加新元素的工作:
class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size) # make sure it's an int
self.reset()
def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new) # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size: # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average: # ~ if isNaN().
self._average = new # no previous numbers, so it's new.
else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /= (n+1) # divide by new number of numbers.
else: # buffer full, replace oldest value.
old = self._buffer[self._index] # the previous oldest number.
self._buffer[self._index] = new # replace with new one.
self._index += 1 # update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...
# ... weighted by the number of elements.
def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average
def get(self):
"""
Return the moving average value.
"""
return self()
def reset(self):
"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer = [] # could use np.empty(self.buffer_size)...
self._index = 0 # and use this to keep track of how many numbers.
self._average = float('nan') # could use np.NaN .
def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size
def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!') # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
你可以测试它,例如:
def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()
这使:
推荐文章
- Numpy Max vs amax vs maximum
- 我应该在.gitignore文件中添加Django迁移文件吗?
- 每n行有熊猫
- 实例属性attribute_name定义在__init__之外
- 如何获取在Python中捕获的异常的名称?
- 第一次出现的值大于现有值的Numpy
- 如何从Python函数中返回两个值?
- 前一个月的Python日期
- Python中方括号括起来的列表和圆括号括起来的列表有什么区别?
- Python日志记录不输出任何东西
- 每n秒运行特定代码
- SQLAlchemy是否有与Django的get_or_create等价的函数?
- 如何将python datetime转换为字符串,具有可读格式的日期?
- 美丽的汤和提取div及其内容的ID
- 在Python中重置生成器对象