更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

在计算拓扑学中,持久同调的思想导致一个有效的 -快如排序数字-解决方案。它不仅检测峰值,还以一种自然的方式量化峰值的“重要性”,使您能够选择对您重要的峰值。

算法的总结。 在一维设置(时间序列,实值信号)中,算法可以简单地描述为下图:

Think of the function graph (or its sub-level set) as a landscape and consider a decreasing water level starting at level infinity (or 1.8 in this picture). While the level decreases, at local maxima islands pop up. At local minima these islands merge together. One detail in this idea is that the island that appeared later in time is merged into the island that is older. The "persistence" of an island is its birth time minus its death time. The lengths of the blue bars depict the persistence, which is the above mentioned "significance" of a peak.

效率。 在对函数值进行排序之后,找到一个在线性时间内运行的实现并不难——实际上它是一个单一的、简单的循环。因此,这种实现在实践中应该是快速的,也很容易实现。

参考文献 一篇关于整个故事的文章和对持久同调(计算代数拓扑中的一个领域)动机的引用可以在这里找到: https://www.sthu.org/blog/13-perstopology-peakdetection/index.html

其他回答

假设你的数据来自传感器(所以算法不可能知道未来的任何事情),

我做了这个算法,它与我在自己的项目中获得的数据非常好。

该算法有2个参数:灵敏度和窗口。

最后,只需一行代码就可以得到你的结果:

detected=data.map((a, b, c) => (a > 0) ? c[b] ** 4 * c[b - 1] ** 3 : -0).map((a, b, c) => a > Math.max(...c.slice(2)) / sensitivity).map((a, b, c) => (b > dwindow) && c.slice(b - dwindow, b).indexOf(a) == -1);

因为我是程序员而不是数学家,所以我不能更好地解释它。但我相信有人可以。

sensitivity = 20; dwindow = 4; data = [1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9, 1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3., 2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3., 2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1. ]; //data = data.concat(data); //data = data.concat(data); var data1 = [{ name: 'original source', y: data }]; Plotly.newPlot('stage1', data1, { title: 'Sensor data', yaxis: { title: 'signal' } }); filtered = data.map((a, b, c) => (a > 0) ? c[b] ** 4 * c[b - 1] ** 3 : -0); var data2 = [{ name: 'filtered source', y: filtered }]; Plotly.newPlot('stage2', data2, { title: 'Filtered data<br>aₙ = aₙ⁴ * aₙ₋₁³', yaxis: { title: 'signal' } }); dwindow = 6; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / sensitivity).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data3 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage3', data3, { title: 'Window 6', yaxis: { title: 'signal' } }); dwindow = 10; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / 20).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data4 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage4', data4, { title: 'Window 10', yaxis: { title: 'signal' } }); <script src="https://cdn.jsdelivr.net/npm/plotly.js@2.16.5/dist/plotly.min.js"></script> <div id="stage1"></div> <div id="stage2"></div> <div id="stage3"></div> <div id="stage4"></div>

函数scipy.signal。Find_peaks,顾名思义,在这方面很有用。但是要得到好的峰值提取,必须了解其参数宽度、阈值、距离和突出度。

根据我的测试和文档,突出的概念是“有用的概念”,可以保留好的峰值,丢弃噪声峰值。

什么是(地形)突出?它是“从山顶下降到任何更高地形所需的最低高度”,如下图所示:

这个想法是:

突出位置越高,山峰就越“重要”。

在计算拓扑学中,持久同调的思想导致一个有效的 -快如排序数字-解决方案。它不仅检测峰值,还以一种自然的方式量化峰值的“重要性”,使您能够选择对您重要的峰值。

算法的总结。 在一维设置(时间序列,实值信号)中,算法可以简单地描述为下图:

Think of the function graph (or its sub-level set) as a landscape and consider a decreasing water level starting at level infinity (or 1.8 in this picture). While the level decreases, at local maxima islands pop up. At local minima these islands merge together. One detail in this idea is that the island that appeared later in time is merged into the island that is older. The "persistence" of an island is its birth time minus its death time. The lengths of the blue bars depict the persistence, which is the above mentioned "significance" of a peak.

效率。 在对函数值进行排序之后,找到一个在线性时间内运行的实现并不难——实际上它是一个单一的、简单的循环。因此,这种实现在实践中应该是快速的,也很容易实现。

参考文献 一篇关于整个故事的文章和对持久同调(计算代数拓扑中的一个领域)动机的引用可以在这里找到: https://www.sthu.org/blog/13-perstopology-peakdetection/index.html

下面是我尝试为“Smoothed z-score算法”创建一个Ruby解决方案:

module ThresholdingAlgoMixin
  def mean(array)
    array.reduce(&:+) / array.size.to_f
  end

  def stddev(array)
    array_mean = mean(array)
    Math.sqrt(array.reduce(0.0) { |a, b| a.to_f + ((b.to_f - array_mean) ** 2) } / array.size.to_f)
  end

  def thresholding_algo(lag: 5, threshold: 3.5, influence: 0.5)
    return nil if size < lag * 2
    Array.new(size, 0).tap do |signals|
      filtered = Array.new(self)

      initial_slice = take(lag)
      avg_filter = Array.new(lag - 1, 0.0) + [mean(initial_slice)]
      std_filter = Array.new(lag - 1, 0.0) + [stddev(initial_slice)]
      (lag..size-1).each do |idx|
        prev = idx - 1
        if (fetch(idx) - avg_filter[prev]).abs > threshold * std_filter[prev]
          signals[idx] = fetch(idx) > avg_filter[prev] ? 1 : -1
          filtered[idx] = (influence * fetch(idx)) + ((1-influence) * filtered[prev])
        end

        filtered_slice = filtered[idx-lag..prev]
        avg_filter[idx] = mean(filtered_slice)
        std_filter[idx] = stddev(filtered_slice)
      end
    end
  end
end

以及示例用法:

test_data = [
  1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1,
  1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1,
  1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5,
  1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2,
  1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1
].extend(ThresholdingAlgoMixin)

puts test_data.thresholding_algo.inspect

# Output: [
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
#   1, 1, 0, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0
# ]

在Palshikar(2009)中发现了另一个算法:

Palshikar, G.(2009)。时间序列中峰值检测的简单算法。在Proc. 1st Int。高级数据分析,商业分析和智能(卷122)。

论文可以从这里下载。

算法是这样的:

algorithm peak1 // one peak detection algorithms that uses peak function S1 

input T = x1, x2, …, xN, N // input time-series of N points 
input k // window size around the peak 
input h // typically 1 <= h <= 3 
output O // set of peaks detected in T 

begin 
O = empty set // initially empty 

    for (i = 1; i < n; i++) do
        // compute peak function value for each of the N points in T 
        a[i] = S1(k,i,xi,T); 
    end for 

    Compute the mean m' and standard deviation s' of all positive values in array a; 

    for (i = 1; i < n; i++) do // remove local peaks which are “small” in global context 
        if (a[i] > 0 && (a[i] – m') >( h * s')) then O = O + {xi}; 
        end if 
    end for 

    Order peaks in O in terms of increasing index in T 

    // retain only one peak out of any set of peaks within distance k of each other 

    for every adjacent pair of peaks xi and xj in O do 
        if |j – i| <= k then remove the smaller value of {xi, xj} from O 
        end if 
    end for 
end

优势

本文提出了5种不同的峰值检测算法 算法在原始时间序列数据上工作(不需要平滑)

缺点

很难事先确定k和h 峰不能是平的(就像我测试数据中的第三个峰)

例子: