Python中是否有SciPy函数或NumPy函数或模块来计算给定特定窗口的1D数组的运行平均值?


当前回答

另一个解决方案是使用标准库和deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

其他回答

或用于python计算的模块

在我在Tradewave.net的测试中,TA-lib总是赢:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

结果:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

你可以用以下方法计算运行平均值:

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

但是速度很慢。

幸运的是,numpy包含一个卷积函数,我们可以用它来加快速度。运行均值相当于将x与一个长度为N的向量进行卷积,其中所有元素都等于1/N。卷积的numpy实现包括起始瞬态,所以你必须删除前N-1点:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

在我的机器上,快速版本要快20-30倍,这取决于输入向量的长度和平均窗口的大小。

请注意,卷积确实包括一个“相同”模式,它似乎应该解决开始的瞬态问题,但它在开始和结束之间分割。

我知道这是一个老问题,但这里有一个解决方案,它不使用任何额外的数据结构或库。它在输入列表的元素数量上是线性的,我想不出任何其他方法来使它更有效(实际上,如果有人知道更好的分配结果的方法,请告诉我)。

注意:使用numpy数组而不是列表会快得多,但我想消除所有依赖关系。通过多线程执行也可以提高性能

该函数假设输入列表是一维的,所以要小心。

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

例子

假设我们有一个列表data =[1,2,3,4,5,6],我们想在它上面计算周期为3的滚动平均值,并且你还想要一个与输入列表相同大小的输出列表(这是最常见的情况)。

第一个元素的索引为0,因此滚动平均值应该在索引为-2、-1和0的元素上计算。显然,我们没有data[-2]和data[-1](除非您想使用特殊的边界条件),因此我们假设这些元素为0。这相当于对列表进行零填充,除了我们实际上不填充它,只是跟踪需要填充的索引(从0到N-1)。

所以,对于前N个元素,我们只是在累加器中不断地把元素加起来。

result[0] = (0 + 0 + 1) / 3  = 0.333    ==   (sum + 1) / 3
result[1] = (0 + 1 + 2) / 3  = 1        ==   (sum + 2) / 3
result[2] = (1 + 2 + 3) / 3  = 2        ==   (sum + 3) / 3

从元素N+1开始,简单的累加是行不通的。我们期望的结果是[3]=(2 + 3 + 4)/3 = 3,但这与(sum + 4)/3 = 3.333不同。

计算正确值的方法是用sum+4减去数据[0]= 1,从而得到sum+4 - 1 = 9。

这是因为目前sum =数据[0]+数据[1]+数据[2],但对于每个i >= N也是如此,因为在减法之前,sum是数据[i-N] +…+ data[i-2] + data[i-1]。

另一种不使用numpy或pandas找到移动平均线的方法

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(
    lambda a,b: b/a, 
    enumerate(itertools.accumulate(sample), 1))
)

将打印[2.0,4.0,6.0,6.5,7.4,7.83333333333333333]

2.0 = (2)/1 4.0 is (2 + 6) / 2 6.0 = (2 + 6 + 10) / 3 .

上述所有的解决方案都很差,因为它们缺乏

由于本机python而不是numpy向量化实现, 数值稳定性,由于numpy使用不当。cumsum或 由于O(len(x) * w)实现为卷积的速度。

鉴于

import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000

注意x_[:w].sum()等于x[:w-1].sum()。因此,对于第一个平均值,numpy.cumsum(…)加上x[w] / w(通过x_[w+1] / w),并减去0(从x_[0] / w)。结果是x[0:w].mean()

通过cumsum,您将通过添加x[w+1] / w并减去x[0] / w来更新第二个平均值,从而得到x[1:w+1].mean()。

这将一直进行,直到到达x[-w:].mean()。

x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w

这个解是向量化的,O(m),可读且数值稳定。