更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

c++ (Qt)演示端口,交互式参数

我已经将这个算法的演示应用程序移植到c++ (Qt)上。

代码可以在GitHub上找到这里。带有安装程序的Windows(64位)构建在发布页面上。最后,我将添加一些文档和其他发布版本。

您不能绘制点,但可以从文本文件中导入它们(用空格分隔点——换行也算作空格)。您还可以调整算法参数,实时查看效果。这对于针对特定数据集调整算法以及探索参数如何影响结果非常有用。


上面的截图有些过时;从那以后,我添加了两个原始算法中没有的实验性选项:

反向处理数据集的选项(似乎至少改善了功率谱的结果)。 选项,为峰值设置硬性最小阈值。

我还在窗口中间添加了一个笨拙的缩放/平移条,只需用鼠标拖动它来缩放和平移。

模糊的构建指令:

在发布页面上有一个Windows安装程序(64位),但如果你想从源代码构建它,要点是:

安装Qt的构建工具,然后将qmake && make放在与.pro文件相同的目录下,或者 安装Qt Creator,打开.pro文件,选择任何默认的构建配置,然后按下构建和/或运行按钮(Creator的左下角)。

我只测试过Qt5。我有91%的信心,如果你手动配置组件,Qt Creator安装程序会让你安装Qt5(如果你手动配置组件,你还需要确认是否安装了Qt Charts)。Qt6可能是一个流畅的构建,也可能不是。有一天,我将测试Qt4和Qt6,使这些文档更好。也许吧。

其他回答

我在我的机器人项目中需要这样的东西。我想我可以归还Kotlin实现。

/**
* Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
* Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
*
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
*/
fun smoothedZScore(y: List<Double>, lag: Int, threshold: Double, influence: Double): Triple<List<Int>, List<Double>, List<Double>> {
    val stats = SummaryStatistics()
    // the results (peaks, 1 or -1) of our algorithm
    val signals = MutableList<Int>(y.size, { 0 })
    // filter out the signals (peaks) from our original list (using influence arg)
    val filteredY = ArrayList<Double>(y)
    // the current average of the rolling window
    val avgFilter = MutableList<Double>(y.size, { 0.0 })
    // the current standard deviation of the rolling window
    val stdFilter = MutableList<Double>(y.size, { 0.0 })
    // init avgFilter and stdFilter
    y.take(lag).forEach { s -> stats.addValue(s) }
    avgFilter[lag - 1] = stats.mean
    stdFilter[lag - 1] = Math.sqrt(stats.populationVariance) // getStandardDeviation() uses sample variance (not what we want)
    stats.clear()
    //loop input starting at end of rolling window
    (lag..y.size - 1).forEach { i ->
        //if the distance between the current value and average is enough standard deviations (threshold) away
        if (Math.abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]) {
            //this is a signal (i.e. peak), determine if it is a positive or negative signal
            signals[i] = if (y[i] > avgFilter[i - 1]) 1 else -1
            //filter this signal out using influence
            filteredY[i] = (influence * y[i]) + ((1 - influence) * filteredY[i - 1])
        } else {
            //ensure this signal remains a zero
            signals[i] = 0
            //ensure this value is not filtered
            filteredY[i] = y[i]
        }
        //update rolling average and deviation
        (i - lag..i - 1).forEach { stats.addValue(filteredY[it]) }
        avgFilter[i] = stats.getMean()
        stdFilter[i] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
        stats.clear()
    }
    return Triple(signals, avgFilter, stdFilter)
}

带有验证图的示例项目可以在github上找到。

一种方法是根据以下观察来检测峰:

时间t是一个峰值(y (t) > y (t - 1)) & & ((t) > y (t + 1))

它通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

峰如果y (y (t) - (t-dt) > m) && (y (t) - y (t + dt) > m)

dt和m是控制灵敏度和延时的参数

这是你用上述算法得到的结果:

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置m = 0.5,你可以得到一个更清晰的信号,只有一个假阳性:

不需要将极大值与平均值进行比较,还可以将极大值与相邻的最小值进行比较,其中最小值仅定义在噪声阈值之上。 如果局部最大值是>的3倍(或其他置信因子)相邻的最小值,那么这个最大值就是一个峰值。 移动窗口越宽,峰值的确定越准确。 上面使用了以窗口中间为中心的计算, 顺便说一下,而不是在窗口结束时计算(== lag)。

请注意,最大值必须被视为信号之前的增加 之后下降。

下面是平滑z-score算法的Python / numpy实现(见上面的答案)。你可以在这里找到要点。

#!/usr/bin/env python
# Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
import numpy as np
import pylab

def thresholding_algo(y, lag, threshold, influence):
    signals = np.zeros(len(y))
    filteredY = np.array(y)
    avgFilter = [0]*len(y)
    stdFilter = [0]*len(y)
    avgFilter[lag - 1] = np.mean(y[0:lag])
    stdFilter[lag - 1] = np.std(y[0:lag])
    for i in range(lag, len(y)):
        if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
            if y[i] > avgFilter[i-1]:
                signals[i] = 1
            else:
                signals[i] = -1

            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])
        else:
            signals[i] = 0
            filteredY[i] = y[i]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])

    return dict(signals = np.asarray(signals),
                avgFilter = np.asarray(avgFilter),
                stdFilter = np.asarray(stdFilter))

下面是在同一个数据集上的测试,它产生的图与R/Matlab的原始答案相同

# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"], color="cyan", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)

pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

如果边界值或其他标准取决于未来值,那么唯一的解决方案(没有时间机器,或其他关于未来值的知识)是推迟任何决定,直到有足够的未来值。如果你想要一个高于均值的水平,例如,20点,那么你必须等到你至少有19点才能做出任何峰值决策,否则下一个新点可能会完全超过你19点之前的阈值。

Added: If the statistical distribution of the peak heights could be heavy tailed, instead of Uniform or Gaussian, then you may need to wait until you see several thousand peaks before it starts to become unlikely that a hidden Pareto distribution won't produce a peak many times larger than any you currently have seen before or have in your current plot. Unless you somehow know in advance that the very next point can't be 1e20, it could appear, which after rescaling your plot's Y dimension, would be flat up until that point.