更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

不需要将极大值与平均值进行比较,还可以将极大值与相邻的最小值进行比较,其中最小值仅定义在噪声阈值之上。 如果局部最大值是>的3倍(或其他置信因子)相邻的最小值,那么这个最大值就是一个峰值。 移动窗口越宽,峰值的确定越准确。 上面使用了以窗口中间为中心的计算, 顺便说一下,而不是在窗口结束时计算(== lag)。

请注意,最大值必须被视为信号之前的增加 之后下降。

其他回答

This problem looks similar to one I encountered in a hybrid/embedded systems course, but that was related to detecting faults when the input from a sensor is noisy. We used a Kalman filter to estimate/predict the hidden state of the system, then used statistical analysis to determine the likelihood that a fault had occurred. We were working with linear systems, but nonlinear variants exist. I remember the approach being surprisingly adaptive, but it required a model of the dynamics of the system.

以下是平滑z-score算法的Scala版本(非惯用):

/**
  * Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
  * Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
  *
  * @param y - The input vector to analyze
  * @param lag - The lag of the moving window (i.e. how big the window is)
  * @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
  * @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
  * @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
  */
private def smoothedZScore(y: Seq[Double], lag: Int, threshold: Double, influence: Double): Seq[Int] = {
  val stats = new SummaryStatistics()

  // the results (peaks, 1 or -1) of our algorithm
  val signals = mutable.ArrayBuffer.fill(y.length)(0)

  // filter out the signals (peaks) from our original list (using influence arg)
  val filteredY = y.to[mutable.ArrayBuffer]

  // the current average of the rolling window
  val avgFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // the current standard deviation of the rolling window
  val stdFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // init avgFilter and stdFilter
  y.take(lag).foreach(s => stats.addValue(s))

  avgFilter(lag - 1) = stats.getMean
  stdFilter(lag - 1) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)

  // loop input starting at end of rolling window
  y.zipWithIndex.slice(lag, y.length - 1).foreach {
    case (s: Double, i: Int) =>
      // if the distance between the current value and average is enough standard deviations (threshold) away
      if (Math.abs(s - avgFilter(i - 1)) > threshold * stdFilter(i - 1)) {
        // this is a signal (i.e. peak), determine if it is a positive or negative signal
        signals(i) = if (s > avgFilter(i - 1)) 1 else -1
        // filter this signal out using influence
        filteredY(i) = (influence * s) + ((1 - influence) * filteredY(i - 1))
      } else {
        // ensure this signal remains a zero
        signals(i) = 0
        // ensure this value is not filtered
        filteredY(i) = s
      }

      // update rolling average and deviation
      stats.clear()
      filteredY.slice(i - lag, i).foreach(s => stats.addValue(s))
      avgFilter(i) = stats.getMean
      stdFilter(i) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)
  }

  println(y.length)
  println(signals.length)
  println(signals)

  signals.zipWithIndex.foreach {
    case(x: Int, idx: Int) =>
      if (x == 1) {
        println(idx + " " + y(idx))
      }
  }

  val data =
    y.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "y", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "avgFilter", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s - threshold * stdFilter(i)), "name" -> "lower", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s + threshold * stdFilter(i)), "name" -> "upper", "row" -> "data") } ++
    signals.zipWithIndex.map { case (s: Int, i: Int) => Map("x" -> i, "y" -> s, "name" -> "signal", "row" -> "signal") }

  Vegas("Smoothed Z")
    .withData(data)
    .mark(Line)
    .encodeX("x", Quant)
    .encodeY("y", Quant)
    .encodeColor(
      field="name",
      dataType=Nominal
    )
    .encodeRow("row", Ordinal)
    .show

  return signals
}

下面是一个测试,返回与Python和Groovy版本相同的结果:

val y = List(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
  1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
  1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
  1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d)

val lag = 30
val threshold = 5d
val influence = 0d

smoothedZScore(y, lag, threshold, influence)

这里的要点

一个python/numpy的迭代版本的答案https://stackoverflow.com/a/22640362/6029703在这里。对于大数据(100000+),此代码比计算平均和标准偏差的速度更快。

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np
    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return dict(signals=labels,
                avgFilter=avg_filter,
                stdFilter=std_filter)

下面是平滑z-score算法的Groovy (Java)实现(见上面的答案)。

/**
 * "Smoothed zero-score alogrithm" shamelessly copied from https://stackoverflow.com/a/22640362/6029703
 *  Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
 *
 * @param y - The input vector to analyze
 * @param lag - The lag of the moving window (i.e. how big the window is)
 * @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
 * @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
 * @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
 */

public HashMap<String, List<Object>> thresholdingAlgo(List<Double> y, Long lag, Double threshold, Double influence) {
    //init stats instance
    SummaryStatistics stats = new SummaryStatistics()

    //the results (peaks, 1 or -1) of our algorithm
    List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(y.size(), 0))
    //filter out the signals (peaks) from our original list (using influence arg)
    List<Double> filteredY = new ArrayList<Double>(y)
    //the current average of the rolling window
    List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
    //the current standard deviation of the rolling window
    List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
    //init avgFilter and stdFilter
    (0..lag-1).each { stats.addValue(y[it as int]) }
    avgFilter[lag - 1 as int] = stats.getMean()
    stdFilter[lag - 1 as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
    stats.clear()
    //loop input starting at end of rolling window
    (lag..y.size()-1).each { i ->
        //if the distance between the current value and average is enough standard deviations (threshold) away
        if (Math.abs((y[i as int] - avgFilter[i - 1 as int]) as Double) > threshold * stdFilter[i - 1 as int]) {
            //this is a signal (i.e. peak), determine if it is a positive or negative signal
            signals[i as int] = (y[i as int] > avgFilter[i - 1 as int]) ? 1 : -1
            //filter this signal out using influence
            filteredY[i as int] = (influence * y[i as int]) + ((1-influence) * filteredY[i - 1 as int])
        } else {
            //ensure this signal remains a zero
            signals[i as int] = 0
            //ensure this value is not filtered
            filteredY[i as int] = y[i as int]
        }
        //update rolling average and deviation
        (i - lag..i-1).each { stats.addValue(filteredY[it as int] as Double) }
        avgFilter[i as int] = stats.getMean()
        stdFilter[i as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
        stats.clear()
    }

    return [
        signals  : signals,
        avgFilter: avgFilter,
        stdFilter: stdFilter
    ]
}

下面是同一个数据集上的测试,其结果与上面的Python / numpy实现相同。

    // Data
    def y = [1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
         1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
         1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
         1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d]

    // Settings
    def lag = 30
    def threshold = 5
    def influence = 0


    def thresholdingResults = thresholdingAlgo((List<Double>) y, (Long) lag, (Double) threshold, (Double) influence)

    println y.size()
    println thresholdingResults.signals.size()
    println thresholdingResults.signals

    thresholdingResults.signals.eachWithIndex { x, idx ->
        if (x) {
            println y[idx]
        }
    }

@Jean-Paul Smoothed Z Score算法的Dart版本:

class SmoothedZScore {
  int lag = 5;
  num threshold = 10;
  num influence = 0.5;

  num sum(List<num> a) {
    num s = 0;
    for (int i = 0; i < a.length; i++) s += a[i];
    return s;
  }

  num mean(List<num> a) {
    return sum(a) / a.length;
  }

  num stddev(List<num> arr) {
    num arrMean = mean(arr);
    num dev = 0;
    for (int i = 0; i < arr.length; i++) dev += (arr[i] - arrMean) * (arr[i] - arrMean);
    return sqrt(dev / arr.length);
  }

  List<int> smoothedZScore(List<num> y) {
    if (y.length < lag + 2) {
      throw 'y data array too short($y.length) for given lag of $lag';
    }

    // init variables
    List<int> signals = List.filled(y.length, 0);
    List<num> filteredY = List<num>.from(y);
    List<num> leadIn = y.sublist(0, lag);

    var avgFilter = List<num>.filled(y.length, 0);
    var stdFilter = List<num>.filled(y.length, 0);
    avgFilter[lag - 1] = mean(leadIn);
    stdFilter[lag - 1] = stddev(leadIn);

    for (var i = lag; i < y.length; i++) {
      if ((y[i] - avgFilter[i - 1]).abs() > (threshold * stdFilter[i - 1])) {
        signals[i] = y[i] > avgFilter[i - 1] ? 1 : -1;
        // make influence lower
        filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1];
      } else {
        signals[i] = 0; // no signal
        filteredY[i] = y[i];
      }

      // adjust the filters
      List<num> yLag = filteredY.sublist(i - lag, i);
      avgFilter[i] = mean(yLag);
      stdFilter[i] = stddev(yLag);
    }

    return signals;
  }
}