更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

@Jean-Paul Smoothed Z Score算法的Dart版本:

class SmoothedZScore {
  int lag = 5;
  num threshold = 10;
  num influence = 0.5;

  num sum(List<num> a) {
    num s = 0;
    for (int i = 0; i < a.length; i++) s += a[i];
    return s;
  }

  num mean(List<num> a) {
    return sum(a) / a.length;
  }

  num stddev(List<num> arr) {
    num arrMean = mean(arr);
    num dev = 0;
    for (int i = 0; i < arr.length; i++) dev += (arr[i] - arrMean) * (arr[i] - arrMean);
    return sqrt(dev / arr.length);
  }

  List<int> smoothedZScore(List<num> y) {
    if (y.length < lag + 2) {
      throw 'y data array too short($y.length) for given lag of $lag';
    }

    // init variables
    List<int> signals = List.filled(y.length, 0);
    List<num> filteredY = List<num>.from(y);
    List<num> leadIn = y.sublist(0, lag);

    var avgFilter = List<num>.filled(y.length, 0);
    var stdFilter = List<num>.filled(y.length, 0);
    avgFilter[lag - 1] = mean(leadIn);
    stdFilter[lag - 1] = stddev(leadIn);

    for (var i = lag; i < y.length; i++) {
      if ((y[i] - avgFilter[i - 1]).abs() > (threshold * stdFilter[i - 1])) {
        signals[i] = y[i] > avgFilter[i - 1] ? 1 : -1;
        // make influence lower
        filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1];
      } else {
        signals[i] = 0; // no signal
        filteredY[i] = y[i];
      }

      // adjust the filters
      List<num> yLag = filteredY.sublist(i - lag, i);
      avgFilter[i] = mean(yLag);
      stdFilter[i] = stddev(yLag);
    }

    return signals;
  }
}

其他回答

下面是平滑z-score算法的Python / numpy实现(见上面的答案)。你可以在这里找到要点。

#!/usr/bin/env python
# Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
import numpy as np
import pylab

def thresholding_algo(y, lag, threshold, influence):
    signals = np.zeros(len(y))
    filteredY = np.array(y)
    avgFilter = [0]*len(y)
    stdFilter = [0]*len(y)
    avgFilter[lag - 1] = np.mean(y[0:lag])
    stdFilter[lag - 1] = np.std(y[0:lag])
    for i in range(lag, len(y)):
        if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]:
            if y[i] > avgFilter[i-1]:
                signals[i] = 1
            else:
                signals[i] = -1

            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])
        else:
            signals[i] = 0
            filteredY[i] = y[i]
            avgFilter[i] = np.mean(filteredY[(i-lag+1):i+1])
            stdFilter[i] = np.std(filteredY[(i-lag+1):i+1])

    return dict(signals = np.asarray(signals),
                avgFilter = np.asarray(avgFilter),
                stdFilter = np.asarray(stdFilter))

下面是在同一个数据集上的测试,它产生的图与R/Matlab的原始答案相同

# Data
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

# Run algo with settings from above
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y)+1), y)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"], color="cyan", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2)

pylab.plot(np.arange(1, len(y)+1),
           result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2)

pylab.subplot(212)
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

下面是@Jean-Paul为Arduino微控制器设计的平滑z分数的C语言实现,用于获取加速度计读数,并判断撞击的方向是来自左边还是右边。这表现得非常好,因为这个设备返回一个反弹信号。这是设备对峰值检测算法的输入-显示了来自右边的冲击,然后是来自左边的冲击。你可以看到最初的峰值然后传感器的振荡。

#include <stdio.h>
#include <math.h>
#include <string.h>


#define SAMPLE_LENGTH 1000

float stddev(float data[], int len);
float mean(float data[], int len);
void thresholding(float y[], int signals[], int lag, float threshold, float influence);


void thresholding(float y[], int signals[], int lag, float threshold, float influence) {
    memset(signals, 0, sizeof(int) * SAMPLE_LENGTH);
    float filteredY[SAMPLE_LENGTH];
    memcpy(filteredY, y, sizeof(float) * SAMPLE_LENGTH);
    float avgFilter[SAMPLE_LENGTH];
    float stdFilter[SAMPLE_LENGTH];

    avgFilter[lag - 1] = mean(y, lag);
    stdFilter[lag - 1] = stddev(y, lag);

    for (int i = lag; i < SAMPLE_LENGTH; i++) {
        if (fabsf(y[i] - avgFilter[i-1]) > threshold * stdFilter[i-1]) {
            if (y[i] > avgFilter[i-1]) {
                signals[i] = 1;
            } else {
                signals[i] = -1;
            }
            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1];
        } else {
            signals[i] = 0;
        }
        avgFilter[i] = mean(filteredY + i-lag, lag);
        stdFilter[i] = stddev(filteredY + i-lag, lag);
    }
}

float mean(float data[], int len) {
    float sum = 0.0, mean = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        sum += data[i];
    }

    mean = sum/len;
    return mean;


}

float stddev(float data[], int len) {
    float the_mean = mean(data, len);
    float standardDeviation = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        standardDeviation += pow(data[i] - the_mean, 2);
    }

    return sqrt(standardDeviation/len);
}

int main() {
    printf("Hello, World!\n");
    int lag = 100;
    float threshold = 5;
    float influence = 0;
    float y[]=  {1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
  ....
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1}

    int signal[SAMPLE_LENGTH];

    thresholding(y, signal,  lag, threshold, influence);

    return 0;
}

她的结果是影响= 0

不是很好,但这里的影响力= 1

这很好。

我为Jean-Paul最受欢迎的答案写了一个Go包。它假设y值的类型为float64。

github.com/MicahParks/peakdetect

下面的示例使用了这个包,并基于上面提到的流行答案中的R示例。它在编译时没有任何依赖关系,试图保持较低的内存占用,并且在有新数据点进入时不重新处理过去的点。该项目有100%的测试覆盖率,主要来自上述R示例的输入和输出。但是,如果有人发现任何错误,请打开一个GitHub问题。

编辑:我对v0.0.5进行了性能改进,似乎快了10倍!它使用Welford的方法进行初始化,并使用类似的方法计算滞后期(滑动窗口)的平均值和总体标准偏差。特别感谢另一个帖子的回答:https://stackoverflow.com/a/14638138/14797322

下面是基于R例子的Golang例子:

package main

import (
    "fmt"
    "log"

    "github.com/MicahParks/peakdetect"
)

// This example is the equivalent of the R example from the algorithm's author.
// https://stackoverflow.com/a/54507329/14797322
func main() {
    data := []float64{1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1}

    // Algorithm configuration from example.
    const (
        lag       = 30
        threshold = 5
        influence = 0
    )

    // Create then initialize the peak detector.
    detector := peakdetect.NewPeakDetector()
    err := detector.Initialize(influence, threshold, data[:lag]) // The length of the initial values is the lag.
    if err != nil {
        log.Fatalf("Failed to initialize peak detector.\nError: %s", err)
    }

    // Start processing new data points and determine what signal, if any they produce.
    //
    // This method, .Next(), is best for when data is being processed in a stream, but this simply iterates over a slice.
    nextDataPoints := data[lag:]
    for i, newPoint := range nextDataPoints {
        signal := detector.Next(newPoint)
        var signalType string
        switch signal {
        case peakdetect.SignalNegative:
            signalType = "negative"
        case peakdetect.SignalNeutral:
            signalType = "neutral"
        case peakdetect.SignalPositive:
            signalType = "positive"
        }

        println(fmt.Sprintf("Data point at index %d has the signal: %s", i+lag, signalType))
    }

    // This method, .NextBatch(), is a helper function for processing many data points at once. It's returned slice
    // should produce the same signal outputs as the loop above.
    signals := detector.NextBatch(nextDataPoints)
    println(fmt.Sprintf("1:1 ratio of batch inputs to signal outputs: %t", len(signals) == len(nextDataPoints)))
}

另外,这个算法对我来说也很好…

sensitivity = 4; dwindow = 4; k = dwindow; data = [1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9, 1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3., 2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3., 2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1. ]; //data = data.concat(data); //data = data.concat(data); var data1 = [{ name: 'original source', y: data }]; Plotly.newPlot('stage1', data1, { title: 'Sensor data', yaxis: { title: 'signal' } }); filtered = data.map((a,b,c)=>a>=Math.max(...c.slice(b-k,b))?a**3:0); var data2 = [{ name: 'filtered source', y: filtered }]; Plotly.newPlot('stage2', data2, { title: 'Filtered data<br>aₙ = aₙ³', yaxis: { title: 'signal' } }); dwindow = 6; k = dwindow; detected = filtered.map((a,b,c)=>a>Math.max(...c.slice(2))/sensitivity).map((a,b,c)=>(b>k) && c.slice(b-k,b).indexOf(a)==-1 ); var data3 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage3', data3, { title: 'Maximum in a window of 6', yaxis: { title: 'signal' } }); dwindow = 10; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / 20).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data4 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage4', data4, { title: 'Maximum in a window of 10', yaxis: { title: 'signal' } }); <script src="https://cdn.jsdelivr.net/npm/plotly.js@2.16.5/dist/plotly.min.js"></script> <div id="stage1"></div> <div id="stage2"></div> <div id="stage3"></div> <div id="stage4"></div>

这是一个修改后的Fortran版本的z-score算法。 它是专门针对频率空间中传递函数的峰值(共振)检测进行修改的(每个更改在代码中都有一个小注释)。

如果在输入向量的下界附近存在共振,则第一个修改会向用户发出警告,该共振由高于某个阈值的标准偏差表示(在本例中为10%)。这仅仅意味着信号不够平坦,不足以使检测正确地初始化滤波器。

第二种修改是只将峰值的最大值添加到已找到的峰值中。这是通过将每个发现的峰值与其(滞后)前辈及其(滞后)后继者的大小进行比较来达到的。

第三个变化是尊重共振峰通常在共振频率周围表现出某种形式的对称性。因此,围绕当前数据点对称地计算平均值和std是很自然的(而不仅仅是针对之前的数据点)。这将导致更好的峰值检测行为。

这些修改的效果是,整个信号必须事先被函数知道,这是共振检测的通常情况(类似于Jean-Paul的Matlab示例,其中数据点是动态生成的,这是行不通的)。

function PeakDetect(y,lag,threshold, influence)
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer, dimension(size(y)) :: PeakDetect
    real, dimension(size(y)) :: filteredY, avgFilter, stdFilter
    integer :: lag, ii
    real :: threshold, influence

    ! Executing part
    PeakDetect = 0
    filteredY = 0.0
    filteredY(1:lag+1) = y(1:lag+1)
    avgFilter = 0.0
    avgFilter(lag+1) = mean(y(1:2*lag+1))
    stdFilter = 0.0
    stdFilter(lag+1) = std(y(1:2*lag+1))

    if (stdFilter(lag+1)/avgFilter(lag+1)>0.1) then ! If the coefficient of variation exceeds 10%, the signal is too uneven at the start, possibly because of a peak.
        write(unit=*,fmt=1001)
1001        format(1X,'Warning: Peak detection might have failed, as there may be a peak at the edge of the frequency range.',/)
    end if
    do ii = lag+2, size(y)
        if (abs(y(ii) - avgFilter(ii-1)) > threshold * stdFilter(ii-1)) then
            ! Find only the largest outstanding value which is only the one greater than its predecessor and its successor
            if (y(ii) > avgFilter(ii-1) .AND. y(ii) > y(ii-1) .AND. y(ii) > y(ii+1)) then
                PeakDetect(ii) = 1
            end if
            filteredY(ii) = influence * y(ii) + (1 - influence) * filteredY(ii-1)
        else
            filteredY(ii) = y(ii)
        end if
        ! Modified with respect to the original code. Mean and standard deviation are calculted symmetrically around the current point
        avgFilter(ii) = mean(filteredY(ii-lag:ii+lag))
        stdFilter(ii) = std(filteredY(ii-lag:ii+lag))
    end do
end function PeakDetect

real function mean(y)
    !> @brief Calculates the mean of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    mean = sum(y)/N
end function mean

real function std(y)
    !> @brief Calculates the standard deviation of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    std = sqrt((N*dot_product(y,y) - sum(y)**2) / (N*(N-1)))
end function std

对于我的应用程序,算法的工作就像一个魅力!