更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

函数scipy.signal。Find_peaks,顾名思义,在这方面很有用。但是要得到好的峰值提取,必须了解其参数宽度、阈值、距离和突出度。

根据我的测试和文档,突出的概念是“有用的概念”,可以保留好的峰值,丢弃噪声峰值。

什么是(地形)突出?它是“从山顶下降到任何更高地形所需的最低高度”,如下图所示:

这个想法是:

突出位置越高,山峰就越“重要”。

其他回答

不需要将极大值与平均值进行比较,还可以将极大值与相邻的最小值进行比较,其中最小值仅定义在噪声阈值之上。 如果局部最大值是>的3倍(或其他置信因子)相邻的最小值,那么这个最大值就是一个峰值。 移动窗口越宽,峰值的确定越准确。 上面使用了以窗口中间为中心的计算, 顺便说一下,而不是在窗口结束时计算(== lag)。

请注意,最大值必须被视为信号之前的增加 之后下降。

这是一个Python实现的鲁棒峰值检测算法算法。

初始化和计算部分被分开,只有filtered_y数组被保留,它的最大大小等于延迟,因此内存没有增加。(结果与上述答案相同)。 为了绘制图形,还保留了标签数组。

我做了一个github要点。

import numpy as np
import pylab

def init(x, lag, threshold, influence):
    '''
    Smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''

    labels = np.zeros(lag)
    filtered_y = np.array(x[0:lag])
    avg_filter = np.zeros(lag)
    std_filter = np.zeros(lag)
    var_filter = np.zeros(lag)

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])

    return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
                std=std_filter[lag - 1], filtered_y=filtered_y,
                labels=labels)


def add(result, single_value, lag, threshold, influence):
    previous_avg = result['avg']
    previous_var = result['var']
    previous_std = result['std']
    filtered_y = result['filtered_y']
    labels = result['labels']

    if abs(single_value - previous_avg) > threshold * previous_std:
        if single_value > previous_avg:
            labels = np.append(labels, 1)
        else:
            labels = np.append(labels, -1)

        # calculate the new filtered element using the influence factor
        filtered_y = np.append(filtered_y, influence * single_value
                               + (1 - influence) * filtered_y[-1])
    else:
        labels = np.append(labels, 0)
        filtered_y = np.append(filtered_y, single_value)

    # update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
    current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
            - filtered_y[len(filtered_y) - lag - 1])

    # update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
    current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
            - previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
            - lag] - previous_avg) ** 2 - (filtered_y[-1]
            - filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag)  # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....

    # calculate standard deviation for current element as sqrt (current variance)
    current_std_filter = np.sqrt(current_var_filter)

    return dict(avg=current_avg_filter, var=current_var_filter,
                std=current_std_filter, filtered_y=filtered_y[1:],
                labels=labels)

lag = 30
threshold = 5
influence = 0

y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)

i = open('quartz2', 'r')
for i in y[lag:]:
    result = add(result, i, lag, threshold, influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
           lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

假设你的数据来自传感器(所以算法不可能知道未来的任何事情),

我做了这个算法,它与我在自己的项目中获得的数据非常好。

该算法有2个参数:灵敏度和窗口。

最后,只需一行代码就可以得到你的结果:

detected=data.map((a, b, c) => (a > 0) ? c[b] ** 4 * c[b - 1] ** 3 : -0).map((a, b, c) => a > Math.max(...c.slice(2)) / sensitivity).map((a, b, c) => (b > dwindow) && c.slice(b - dwindow, b).indexOf(a) == -1);

因为我是程序员而不是数学家,所以我不能更好地解释它。但我相信有人可以。

sensitivity = 20; dwindow = 4; data = [1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9, 1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3., 2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3., 2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1. ]; //data = data.concat(data); //data = data.concat(data); var data1 = [{ name: 'original source', y: data }]; Plotly.newPlot('stage1', data1, { title: 'Sensor data', yaxis: { title: 'signal' } }); filtered = data.map((a, b, c) => (a > 0) ? c[b] ** 4 * c[b - 1] ** 3 : -0); var data2 = [{ name: 'filtered source', y: filtered }]; Plotly.newPlot('stage2', data2, { title: 'Filtered data<br>aₙ = aₙ⁴ * aₙ₋₁³', yaxis: { title: 'signal' } }); dwindow = 6; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / sensitivity).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data3 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage3', data3, { title: 'Window 6', yaxis: { title: 'signal' } }); dwindow = 10; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / 20).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data4 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage4', data4, { title: 'Window 10', yaxis: { title: 'signal' } }); <script src="https://cdn.jsdelivr.net/npm/plotly.js@2.16.5/dist/plotly.min.js"></script> <div id="stage1"></div> <div id="stage2"></div> <div id="stage3"></div> <div id="stage4"></div>

@Jean-Paul算法的Perl实现。

#!/usr/bin/perl

use strict;
use Data::Dumper;

sub mean {
    my $data = shift;
    my $sum = 0;
    my $mean_val = 0;
    for my $item (@$data) {
        $sum += $item;
    }
    $mean_val = $sum / (scalar @$data) if @$data;
    return $mean_val;
}

sub variance {
    my $data = shift;
    my $variance_val = 0;
    my $mean_val = mean($data);
    my $sum = 0;
    for my $item (@$data) {
        $sum += ($item - $mean_val)**2;
    }
    $variance_val = $sum / (scalar @$data) if @$data;
    return $variance_val;
}

sub std {
    my $data = shift;
    my $variance_val = variance($data);
    return sqrt($variance_val);
}

# @param y - The input vector to analyze
# @parameter lag - The lag of the moving window
# @parameter threshold - The z-score at which the algorithm signals
# @parameter influence - The influence (between 0 and 1) of new signals on the mean and standard deviation
sub thresholding_algo {
    my ($y, $lag, $threshold, $influence) = @_;

    my @signals = (0) x @$y;
    my @filteredY = @$y;
    my @avgFilter = (0) x @$y;
    my @stdFilter = (0) x @$y;

    $avgFilter[$lag - 1] = mean([@$y[0..$lag-1]]);
    $stdFilter[$lag - 1] = std([@$y[0..$lag-1]]);

    for (my $i=$lag; $i <= @$y - 1; $i++) {
        if (abs($y->[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$i-1]) {
            if ($y->[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            } else {
                $signals[$i] = -1;
            }

            $filteredY[$i] = $influence * $y->[$i] + (1 - $influence) * $filteredY[$i-1];
            $avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
            $stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
        }
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $y->[$i];
            $avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
            $stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
        }
    }

    return {
        signals => \@signals,
        avgFilter => \@avgFilter,
        stdFilter => \@stdFilter
    };
}

my $y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1];

my $lag = 30;
my $threshold = 5;
my $influence = 0;

my $result = thresholding_algo($y, $lag, $threshold, $influence);

print Dumper $result;

我为Jean-Paul最受欢迎的答案写了一个Go包。它假设y值的类型为float64。

github.com/MicahParks/peakdetect

下面的示例使用了这个包,并基于上面提到的流行答案中的R示例。它在编译时没有任何依赖关系,试图保持较低的内存占用,并且在有新数据点进入时不重新处理过去的点。该项目有100%的测试覆盖率,主要来自上述R示例的输入和输出。但是,如果有人发现任何错误,请打开一个GitHub问题。

编辑:我对v0.0.5进行了性能改进,似乎快了10倍!它使用Welford的方法进行初始化,并使用类似的方法计算滞后期(滑动窗口)的平均值和总体标准偏差。特别感谢另一个帖子的回答:https://stackoverflow.com/a/14638138/14797322

下面是基于R例子的Golang例子:

package main

import (
    "fmt"
    "log"

    "github.com/MicahParks/peakdetect"
)

// This example is the equivalent of the R example from the algorithm's author.
// https://stackoverflow.com/a/54507329/14797322
func main() {
    data := []float64{1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1}

    // Algorithm configuration from example.
    const (
        lag       = 30
        threshold = 5
        influence = 0
    )

    // Create then initialize the peak detector.
    detector := peakdetect.NewPeakDetector()
    err := detector.Initialize(influence, threshold, data[:lag]) // The length of the initial values is the lag.
    if err != nil {
        log.Fatalf("Failed to initialize peak detector.\nError: %s", err)
    }

    // Start processing new data points and determine what signal, if any they produce.
    //
    // This method, .Next(), is best for when data is being processed in a stream, but this simply iterates over a slice.
    nextDataPoints := data[lag:]
    for i, newPoint := range nextDataPoints {
        signal := detector.Next(newPoint)
        var signalType string
        switch signal {
        case peakdetect.SignalNegative:
            signalType = "negative"
        case peakdetect.SignalNeutral:
            signalType = "neutral"
        case peakdetect.SignalPositive:
            signalType = "positive"
        }

        println(fmt.Sprintf("Data point at index %d has the signal: %s", i+lag, signalType))
    }

    // This method, .NextBatch(), is a helper function for processing many data points at once. It's returned slice
    // should produce the same signal outputs as the loop above.
    signals := detector.NextBatch(nextDataPoints)
    println(fmt.Sprintf("1:1 ratio of batch inputs to signal outputs: %t", len(signals) == len(nextDataPoints)))
}