更新:到目前为止表现最好的算法是这个。
这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。
考虑以下示例数据:
这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):
p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ...
3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];
你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:
有一种具有一般平均值的基本噪声
有很大的“峰值”或“更高的数据点”明显偏离噪声。
让我们假设以下情况:
峰的宽度不能事先确定
峰的高度明显偏离其他值
算法实时更新(因此每个新数据点都会更新)
对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。
我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?
健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)
在Palshikar(2009)中发现了另一个算法:
Palshikar, G.(2009)。时间序列中峰值检测的简单算法。在Proc. 1st Int。高级数据分析,商业分析和智能(卷122)。
论文可以从这里下载。
算法是这样的:
algorithm peak1 // one peak detection algorithms that uses peak function S1
input T = x1, x2, …, xN, N // input time-series of N points
input k // window size around the peak
input h // typically 1 <= h <= 3
output O // set of peaks detected in T
begin
O = empty set // initially empty
for (i = 1; i < n; i++) do
// compute peak function value for each of the N points in T
a[i] = S1(k,i,xi,T);
end for
Compute the mean m' and standard deviation s' of all positive values in array a;
for (i = 1; i < n; i++) do // remove local peaks which are “small” in global context
if (a[i] > 0 && (a[i] – m') >( h * s')) then O = O + {xi};
end if
end for
Order peaks in O in terms of increasing index in T
// retain only one peak out of any set of peaks within distance k of each other
for every adjacent pair of peaks xi and xj in O do
if |j – i| <= k then remove the smaller value of {xi, xj} from O
end if
end for
end
优势
本文提出了5种不同的峰值检测算法
算法在原始时间序列数据上工作(不需要平滑)
缺点
很难事先确定k和h
峰不能是平的(就像我测试数据中的第三个峰)
例子:
下面是这个答案的平滑z-score算法的c++实现
std::vector<int> smoothedZScore(std::vector<float> input)
{
//lag 5 for the smoothing functions
int lag = 5;
//3.5 standard deviations for signal
float threshold = 3.5;
//between 0 and 1, where 1 is normal influence, 0.5 is half
float influence = .5;
if (input.size() <= lag + 2)
{
std::vector<int> emptyVec;
return emptyVec;
}
//Initialise variables
std::vector<int> signals(input.size(), 0.0);
std::vector<float> filteredY(input.size(), 0.0);
std::vector<float> avgFilter(input.size(), 0.0);
std::vector<float> stdFilter(input.size(), 0.0);
std::vector<float> subVecStart(input.begin(), input.begin() + lag);
avgFilter[lag] = mean(subVecStart);
stdFilter[lag] = stdDev(subVecStart);
for (size_t i = lag + 1; i < input.size(); i++)
{
if (std::abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
{
if (input[i] > avgFilter[i - 1])
{
signals[i] = 1; //# Positive signal
}
else
{
signals[i] = -1; //# Negative signal
}
//Make influence lower
filteredY[i] = influence* input[i] + (1 - influence) * filteredY[i - 1];
}
else
{
signals[i] = 0; //# No signal
filteredY[i] = input[i];
}
//Adjust the filters
std::vector<float> subVec(filteredY.begin() + i - lag, filteredY.begin() + i);
avgFilter[i] = mean(subVec);
stdFilter[i] = stdDev(subVec);
}
return signals;
}
下面是在Golang中实现的Smoothed z-score算法(上图)。它假设一个[]int16 (PCM 16bit样本)的切片。你可以在这里找到要点。
/*
Settings (the ones below are examples: choose what is best for your data)
set lag to 5; # lag 5 for the smoothing functions
set threshold to 3.5; # 3.5 standard deviations for signal
set influence to 0.5; # between 0 and 1, where 1 is normal influence, 0.5 is half
*/
// ZScore on 16bit WAV samples
func ZScore(samples []int16, lag int, threshold float64, influence float64) (signals []int16) {
//lag := 20
//threshold := 3.5
//influence := 0.5
signals = make([]int16, len(samples))
filteredY := make([]int16, len(samples))
for i, sample := range samples[0:lag] {
filteredY[i] = sample
}
avgFilter := make([]int16, len(samples))
stdFilter := make([]int16, len(samples))
avgFilter[lag] = Average(samples[0:lag])
stdFilter[lag] = Std(samples[0:lag])
for i := lag + 1; i < len(samples); i++ {
f := float64(samples[i])
if float64(Abs(samples[i]-avgFilter[i-1])) > threshold*float64(stdFilter[i-1]) {
if samples[i] > avgFilter[i-1] {
signals[i] = 1
} else {
signals[i] = -1
}
filteredY[i] = int16(influence*f + (1-influence)*float64(filteredY[i-1]))
avgFilter[i] = Average(filteredY[(i - lag):i])
stdFilter[i] = Std(filteredY[(i - lag):i])
} else {
signals[i] = 0
filteredY[i] = samples[i]
avgFilter[i] = Average(filteredY[(i - lag):i])
stdFilter[i] = Std(filteredY[(i - lag):i])
}
}
return
}
// Average a chunk of values
func Average(chunk []int16) (avg int16) {
var sum int64
for _, sample := range chunk {
if sample < 0 {
sample *= -1
}
sum += int64(sample)
}
return int16(sum / int64(len(chunk)))
}