这是一个Python实现的鲁棒峰值检测算法算法。
初始化和计算部分被分开,只有filtered_y数组被保留,它的最大大小等于延迟,因此内存没有增加。(结果与上述答案相同)。
为了绘制图形,还保留了标签数组。
我做了一个github要点。
import numpy as np
import pylab
def init(x, lag, threshold, influence):
'''
Smoothed z-score algorithm
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
'''
labels = np.zeros(lag)
filtered_y = np.array(x[0:lag])
avg_filter = np.zeros(lag)
std_filter = np.zeros(lag)
var_filter = np.zeros(lag)
avg_filter[lag - 1] = np.mean(x[0:lag])
std_filter[lag - 1] = np.std(x[0:lag])
var_filter[lag - 1] = np.var(x[0:lag])
return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
std=std_filter[lag - 1], filtered_y=filtered_y,
labels=labels)
def add(result, single_value, lag, threshold, influence):
previous_avg = result['avg']
previous_var = result['var']
previous_std = result['std']
filtered_y = result['filtered_y']
labels = result['labels']
if abs(single_value - previous_avg) > threshold * previous_std:
if single_value > previous_avg:
labels = np.append(labels, 1)
else:
labels = np.append(labels, -1)
# calculate the new filtered element using the influence factor
filtered_y = np.append(filtered_y, influence * single_value
+ (1 - influence) * filtered_y[-1])
else:
labels = np.append(labels, 0)
filtered_y = np.append(filtered_y, single_value)
# update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
- filtered_y[len(filtered_y) - lag - 1])
# update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
- previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
- lag] - previous_avg) ** 2 - (filtered_y[-1]
- filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag) # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....
# calculate standard deviation for current element as sqrt (current variance)
current_std_filter = np.sqrt(current_var_filter)
return dict(avg=current_avg_filter, var=current_var_filter,
std=current_std_filter, filtered_y=filtered_y[1:],
labels=labels)
lag = 30
threshold = 5
influence = 0
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])
# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)
i = open('quartz2', 'r')
for i in y[lag:]:
result = add(result, i, lag, threshold, influence)
# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()