更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

在信号处理中,峰值检测通常采用小波变换。基本上就是对时间序列数据进行离散小波变换。返回的细节系数中的过零将对应于时间序列信号中的峰值。你会在不同的细节系数水平上检测到不同的峰值振幅,这给了你多层次的分辨率。

其他回答

我在我的机器人项目中需要这样的东西。我想我可以归还Kotlin实现。

/**
* Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
* Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
*
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
*/
fun smoothedZScore(y: List<Double>, lag: Int, threshold: Double, influence: Double): Triple<List<Int>, List<Double>, List<Double>> {
    val stats = SummaryStatistics()
    // the results (peaks, 1 or -1) of our algorithm
    val signals = MutableList<Int>(y.size, { 0 })
    // filter out the signals (peaks) from our original list (using influence arg)
    val filteredY = ArrayList<Double>(y)
    // the current average of the rolling window
    val avgFilter = MutableList<Double>(y.size, { 0.0 })
    // the current standard deviation of the rolling window
    val stdFilter = MutableList<Double>(y.size, { 0.0 })
    // init avgFilter and stdFilter
    y.take(lag).forEach { s -> stats.addValue(s) }
    avgFilter[lag - 1] = stats.mean
    stdFilter[lag - 1] = Math.sqrt(stats.populationVariance) // getStandardDeviation() uses sample variance (not what we want)
    stats.clear()
    //loop input starting at end of rolling window
    (lag..y.size - 1).forEach { i ->
        //if the distance between the current value and average is enough standard deviations (threshold) away
        if (Math.abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]) {
            //this is a signal (i.e. peak), determine if it is a positive or negative signal
            signals[i] = if (y[i] > avgFilter[i - 1]) 1 else -1
            //filter this signal out using influence
            filteredY[i] = (influence * y[i]) + ((1 - influence) * filteredY[i - 1])
        } else {
            //ensure this signal remains a zero
            signals[i] = 0
            //ensure this value is not filtered
            filteredY[i] = y[i]
        }
        //update rolling average and deviation
        (i - lag..i - 1).forEach { stats.addValue(filteredY[it]) }
        avgFilter[i] = stats.getMean()
        stdFilter[i] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
        stats.clear()
    }
    return Triple(signals, avgFilter, stdFilter)
}

带有验证图的示例项目可以在github上找到。

这是一个修改后的Fortran版本的z-score算法。 它是专门针对频率空间中传递函数的峰值(共振)检测进行修改的(每个更改在代码中都有一个小注释)。

如果在输入向量的下界附近存在共振,则第一个修改会向用户发出警告,该共振由高于某个阈值的标准偏差表示(在本例中为10%)。这仅仅意味着信号不够平坦,不足以使检测正确地初始化滤波器。

第二种修改是只将峰值的最大值添加到已找到的峰值中。这是通过将每个发现的峰值与其(滞后)前辈及其(滞后)后继者的大小进行比较来达到的。

第三个变化是尊重共振峰通常在共振频率周围表现出某种形式的对称性。因此,围绕当前数据点对称地计算平均值和std是很自然的(而不仅仅是针对之前的数据点)。这将导致更好的峰值检测行为。

这些修改的效果是,整个信号必须事先被函数知道,这是共振检测的通常情况(类似于Jean-Paul的Matlab示例,其中数据点是动态生成的,这是行不通的)。

function PeakDetect(y,lag,threshold, influence)
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer, dimension(size(y)) :: PeakDetect
    real, dimension(size(y)) :: filteredY, avgFilter, stdFilter
    integer :: lag, ii
    real :: threshold, influence

    ! Executing part
    PeakDetect = 0
    filteredY = 0.0
    filteredY(1:lag+1) = y(1:lag+1)
    avgFilter = 0.0
    avgFilter(lag+1) = mean(y(1:2*lag+1))
    stdFilter = 0.0
    stdFilter(lag+1) = std(y(1:2*lag+1))

    if (stdFilter(lag+1)/avgFilter(lag+1)>0.1) then ! If the coefficient of variation exceeds 10%, the signal is too uneven at the start, possibly because of a peak.
        write(unit=*,fmt=1001)
1001        format(1X,'Warning: Peak detection might have failed, as there may be a peak at the edge of the frequency range.',/)
    end if
    do ii = lag+2, size(y)
        if (abs(y(ii) - avgFilter(ii-1)) > threshold * stdFilter(ii-1)) then
            ! Find only the largest outstanding value which is only the one greater than its predecessor and its successor
            if (y(ii) > avgFilter(ii-1) .AND. y(ii) > y(ii-1) .AND. y(ii) > y(ii+1)) then
                PeakDetect(ii) = 1
            end if
            filteredY(ii) = influence * y(ii) + (1 - influence) * filteredY(ii-1)
        else
            filteredY(ii) = y(ii)
        end if
        ! Modified with respect to the original code. Mean and standard deviation are calculted symmetrically around the current point
        avgFilter(ii) = mean(filteredY(ii-lag:ii+lag))
        stdFilter(ii) = std(filteredY(ii-lag:ii+lag))
    end do
end function PeakDetect

real function mean(y)
    !> @brief Calculates the mean of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    mean = sum(y)/N
end function mean

real function std(y)
    !> @brief Calculates the standard deviation of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    std = sqrt((N*dot_product(y,y) - sum(y)**2) / (N*(N-1)))
end function std

对于我的应用程序,算法的工作就像一个魅力!

另外,这个算法对我来说也很好…

sensitivity = 4; dwindow = 4; k = dwindow; data = [1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9, 1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3., 2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3., 2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1. ]; //data = data.concat(data); //data = data.concat(data); var data1 = [{ name: 'original source', y: data }]; Plotly.newPlot('stage1', data1, { title: 'Sensor data', yaxis: { title: 'signal' } }); filtered = data.map((a,b,c)=>a>=Math.max(...c.slice(b-k,b))?a**3:0); var data2 = [{ name: 'filtered source', y: filtered }]; Plotly.newPlot('stage2', data2, { title: 'Filtered data<br>aₙ = aₙ³', yaxis: { title: 'signal' } }); dwindow = 6; k = dwindow; detected = filtered.map((a,b,c)=>a>Math.max(...c.slice(2))/sensitivity).map((a,b,c)=>(b>k) && c.slice(b-k,b).indexOf(a)==-1 ); var data3 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage3', data3, { title: 'Maximum in a window of 6', yaxis: { title: 'signal' } }); dwindow = 10; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / 20).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data4 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage4', data4, { title: 'Maximum in a window of 10', yaxis: { title: 'signal' } }); <script src="https://cdn.jsdelivr.net/npm/plotly.js@2.16.5/dist/plotly.min.js"></script> <div id="stage1"></div> <div id="stage2"></div> <div id="stage3"></div> <div id="stage4"></div>

一种方法是根据以下观察来检测峰:

时间t是一个峰值(y (t) > y (t - 1)) & & ((t) > y (t + 1))

它通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

峰如果y (y (t) - (t-dt) > m) && (y (t) - y (t + dt) > m)

dt和m是控制灵敏度和延时的参数

这是你用上述算法得到的结果:

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置m = 0.5,你可以得到一个更清晰的信号,只有一个假阳性:

在计算拓扑学中,持久同调的思想导致一个有效的 -快如排序数字-解决方案。它不仅检测峰值,还以一种自然的方式量化峰值的“重要性”,使您能够选择对您重要的峰值。

算法的总结。 在一维设置(时间序列,实值信号)中,算法可以简单地描述为下图:

Think of the function graph (or its sub-level set) as a landscape and consider a decreasing water level starting at level infinity (or 1.8 in this picture). While the level decreases, at local maxima islands pop up. At local minima these islands merge together. One detail in this idea is that the island that appeared later in time is merged into the island that is older. The "persistence" of an island is its birth time minus its death time. The lengths of the blue bars depict the persistence, which is the above mentioned "significance" of a peak.

效率。 在对函数值进行排序之后,找到一个在线性时间内运行的实现并不难——实际上它是一个单一的、简单的循环。因此,这种实现在实践中应该是快速的,也很容易实现。

参考文献 一篇关于整个故事的文章和对持久同调(计算代数拓扑中的一个领域)动机的引用可以在这里找到: https://www.sthu.org/blog/13-perstopology-peakdetection/index.html