更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

使用实时流的Python版本(不会在每个新数据点到达时重新计算所有数据点)。您可能想要调整类函数返回的内容—对于我的目的,我只需要信号。

import numpy as np


class real_time_peak_detection():
    def __init__(self, array, lag, threshold, influence):
        self.y = list(array)
        self.length = len(self.y)
        self.lag = lag
        self.threshold = threshold
        self.influence = influence
        self.signals = [0] * len(self.y)
        self.filteredY = np.array(self.y).tolist()
        self.avgFilter = [0] * len(self.y)
        self.stdFilter = [0] * len(self.y)
        self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
        self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()

    def thresholding_algo(self, new_value):
        self.y.append(new_value)
        i = len(self.y) - 1
        self.length = len(self.y)
        if i < self.lag:
            return 0
        elif i == self.lag:
            self.signals = [0] * len(self.y)
            self.filteredY = np.array(self.y).tolist()
            self.avgFilter = [0] * len(self.y)
            self.stdFilter = [0] * len(self.y)
            self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
            self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
            return 0

        self.signals += [0]
        self.filteredY += [0]
        self.avgFilter += [0]
        self.stdFilter += [0]

        if abs(self.y[i] - self.avgFilter[i - 1]) > (self.threshold * self.stdFilter[i - 1]):

            if self.y[i] > self.avgFilter[i - 1]:
                self.signals[i] = 1
            else:
                self.signals[i] = -1

            self.filteredY[i] = self.influence * self.y[i] + \
                (1 - self.influence) * self.filteredY[i - 1]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
        else:
            self.signals[i] = 0
            self.filteredY[i] = self.y[i]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])

        return self.signals[i]

其他回答

以下是平滑z-score算法的Scala版本(非惯用):

/**
  * Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
  * Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
  *
  * @param y - The input vector to analyze
  * @param lag - The lag of the moving window (i.e. how big the window is)
  * @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
  * @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
  * @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
  */
private def smoothedZScore(y: Seq[Double], lag: Int, threshold: Double, influence: Double): Seq[Int] = {
  val stats = new SummaryStatistics()

  // the results (peaks, 1 or -1) of our algorithm
  val signals = mutable.ArrayBuffer.fill(y.length)(0)

  // filter out the signals (peaks) from our original list (using influence arg)
  val filteredY = y.to[mutable.ArrayBuffer]

  // the current average of the rolling window
  val avgFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // the current standard deviation of the rolling window
  val stdFilter = mutable.ArrayBuffer.fill(y.length)(0d)

  // init avgFilter and stdFilter
  y.take(lag).foreach(s => stats.addValue(s))

  avgFilter(lag - 1) = stats.getMean
  stdFilter(lag - 1) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)

  // loop input starting at end of rolling window
  y.zipWithIndex.slice(lag, y.length - 1).foreach {
    case (s: Double, i: Int) =>
      // if the distance between the current value and average is enough standard deviations (threshold) away
      if (Math.abs(s - avgFilter(i - 1)) > threshold * stdFilter(i - 1)) {
        // this is a signal (i.e. peak), determine if it is a positive or negative signal
        signals(i) = if (s > avgFilter(i - 1)) 1 else -1
        // filter this signal out using influence
        filteredY(i) = (influence * s) + ((1 - influence) * filteredY(i - 1))
      } else {
        // ensure this signal remains a zero
        signals(i) = 0
        // ensure this value is not filtered
        filteredY(i) = s
      }

      // update rolling average and deviation
      stats.clear()
      filteredY.slice(i - lag, i).foreach(s => stats.addValue(s))
      avgFilter(i) = stats.getMean
      stdFilter(i) = Math.sqrt(stats.getPopulationVariance) // getStandardDeviation() uses sample variance (not what we want)
  }

  println(y.length)
  println(signals.length)
  println(signals)

  signals.zipWithIndex.foreach {
    case(x: Int, idx: Int) =>
      if (x == 1) {
        println(idx + " " + y(idx))
      }
  }

  val data =
    y.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "y", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> s, "name" -> "avgFilter", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s - threshold * stdFilter(i)), "name" -> "lower", "row" -> "data") } ++
    avgFilter.zipWithIndex.map { case (s: Double, i: Int) => Map("x" -> i, "y" -> (s + threshold * stdFilter(i)), "name" -> "upper", "row" -> "data") } ++
    signals.zipWithIndex.map { case (s: Int, i: Int) => Map("x" -> i, "y" -> s, "name" -> "signal", "row" -> "signal") }

  Vegas("Smoothed Z")
    .withData(data)
    .mark(Line)
    .encodeX("x", Quant)
    .encodeY("y", Quant)
    .encodeColor(
      field="name",
      dataType=Nominal
    )
    .encodeRow("row", Ordinal)
    .show

  return signals
}

下面是一个测试,返回与Python和Groovy版本相同的结果:

val y = List(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
  1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
  1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
  1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d)

val lag = 30
val threshold = 5d
val influence = 0d

smoothedZScore(y, lag, threshold, influence)

这里的要点

根据@Jean-Paul提出的解决方案,我用c#实现了他的算法

public class ZScoreOutput
{
    public List<double> input;
    public List<int> signals;
    public List<double> avgFilter;
    public List<double> filtered_stddev;
}

public static class ZScore
{
    public static ZScoreOutput StartAlgo(List<double> input, int lag, double threshold, double influence)
    {
        // init variables!
        int[] signals = new int[input.Count];
        double[] filteredY = new List<double>(input).ToArray();
        double[] avgFilter = new double[input.Count];
        double[] stdFilter = new double[input.Count];

        var initialWindow = new List<double>(filteredY).Skip(0).Take(lag).ToList();

        avgFilter[lag - 1] = Mean(initialWindow);
        stdFilter[lag - 1] = StdDev(initialWindow);

        for (int i = lag; i < input.Count; i++)
        {
            if (Math.Abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
            {
                signals[i] = (input[i] > avgFilter[i - 1]) ? 1 : -1;
                filteredY[i] = influence * input[i] + (1 - influence) * filteredY[i - 1];
            }
            else
            {
                signals[i] = 0;
                filteredY[i] = input[i];
            }

            // Update rolling average and deviation
            var slidingWindow = new List<double>(filteredY).Skip(i - lag).Take(lag+1).ToList();

            var tmpMean = Mean(slidingWindow);
            var tmpStdDev = StdDev(slidingWindow);

            avgFilter[i] = Mean(slidingWindow);
            stdFilter[i] = StdDev(slidingWindow);
        }

        // Copy to convenience class 
        var result = new ZScoreOutput();
        result.input = input;
        result.avgFilter       = new List<double>(avgFilter);
        result.signals         = new List<int>(signals);
        result.filtered_stddev = new List<double>(stdFilter);

        return result;
    }

    private static double Mean(List<double> list)
    {
        // Simple helper function! 
        return list.Average();
    }

    private static double StdDev(List<double> values)
    {
        double ret = 0;
        if (values.Count() > 0)
        {
            double avg = values.Average();
            double sum = values.Sum(d => Math.Pow(d - avg, 2));
            ret = Math.Sqrt((sum) / (values.Count() - 1));
        }
        return ret;
    }
}

使用示例:

var input = new List<double> {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0,
    1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9,
    1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0, 1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0,
    3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0, 1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0,
    1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

int lag = 30;
double threshold = 5.0;
double influence = 0.0;

var output = ZScore.StartAlgo(input, lag, threshold, influence);

下面是平滑z-score算法的Groovy (Java)实现(见上面的答案)。

/**
 * "Smoothed zero-score alogrithm" shamelessly copied from https://stackoverflow.com/a/22640362/6029703
 *  Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
 *
 * @param y - The input vector to analyze
 * @param lag - The lag of the moving window (i.e. how big the window is)
 * @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
 * @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
 * @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
 */

public HashMap<String, List<Object>> thresholdingAlgo(List<Double> y, Long lag, Double threshold, Double influence) {
    //init stats instance
    SummaryStatistics stats = new SummaryStatistics()

    //the results (peaks, 1 or -1) of our algorithm
    List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(y.size(), 0))
    //filter out the signals (peaks) from our original list (using influence arg)
    List<Double> filteredY = new ArrayList<Double>(y)
    //the current average of the rolling window
    List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
    //the current standard deviation of the rolling window
    List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(y.size(), 0.0d))
    //init avgFilter and stdFilter
    (0..lag-1).each { stats.addValue(y[it as int]) }
    avgFilter[lag - 1 as int] = stats.getMean()
    stdFilter[lag - 1 as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
    stats.clear()
    //loop input starting at end of rolling window
    (lag..y.size()-1).each { i ->
        //if the distance between the current value and average is enough standard deviations (threshold) away
        if (Math.abs((y[i as int] - avgFilter[i - 1 as int]) as Double) > threshold * stdFilter[i - 1 as int]) {
            //this is a signal (i.e. peak), determine if it is a positive or negative signal
            signals[i as int] = (y[i as int] > avgFilter[i - 1 as int]) ? 1 : -1
            //filter this signal out using influence
            filteredY[i as int] = (influence * y[i as int]) + ((1-influence) * filteredY[i - 1 as int])
        } else {
            //ensure this signal remains a zero
            signals[i as int] = 0
            //ensure this value is not filtered
            filteredY[i as int] = y[i as int]
        }
        //update rolling average and deviation
        (i - lag..i-1).each { stats.addValue(filteredY[it as int] as Double) }
        avgFilter[i as int] = stats.getMean()
        stdFilter[i as int] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
        stats.clear()
    }

    return [
        signals  : signals,
        avgFilter: avgFilter,
        stdFilter: stdFilter
    ]
}

下面是同一个数据集上的测试,其结果与上面的Python / numpy实现相同。

    // Data
    def y = [1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d,
         1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d, 1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d,
         1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d, 0.9d, 1d,
         1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d]

    // Settings
    def lag = 30
    def threshold = 5
    def influence = 0


    def thresholdingResults = thresholdingAlgo((List<Double>) y, (Long) lag, (Double) threshold, (Double) influence)

    println y.size()
    println thresholdingResults.signals.size()
    println thresholdingResults.signals

    thresholdingResults.signals.eachWithIndex { x, idx ->
        if (x) {
            println y[idx]
        }
    }

c++实现

#include <iostream>
#include <vector>
#include <algorithm>
#include <unordered_map>
#include <cmath>
#include <iterator>
#include <numeric>

using namespace std;

typedef long double ld;
typedef unsigned int uint;
typedef std::vector<ld>::iterator vec_iter_ld;

/**
 * Overriding the ostream operator for pretty printing vectors.
 */
template<typename T>
std::ostream &operator<<(std::ostream &os, std::vector<T> vec) {
    os << "[";
    if (vec.size() != 0) {
        std::copy(vec.begin(), vec.end() - 1, std::ostream_iterator<T>(os, " "));
        os << vec.back();
    }
    os << "]";
    return os;
}

/**
 * This class calculates mean and standard deviation of a subvector.
 * This is basically stats computation of a subvector of a window size qual to "lag".
 */
class VectorStats {
public:
    /**
     * Constructor for VectorStats class.
     *
     * @param start - This is the iterator position of the start of the window,
     * @param end   - This is the iterator position of the end of the window,
     */
    VectorStats(vec_iter_ld start, vec_iter_ld end) {
        this->start = start;
        this->end = end;
        this->compute();
    }

    /**
     * This method calculates the mean and standard deviation using STL function.
     * This is the Two-Pass implementation of the Mean & Variance calculation.
     */
    void compute() {
        ld sum = std::accumulate(start, end, 0.0);
        uint slice_size = std::distance(start, end);
        ld mean = sum / slice_size;
        std::vector<ld> diff(slice_size);
        std::transform(start, end, diff.begin(), [mean](ld x) { return x - mean; });
        ld sq_sum = std::inner_product(diff.begin(), diff.end(), diff.begin(), 0.0);
        ld std_dev = std::sqrt(sq_sum / slice_size);

        this->m1 = mean;
        this->m2 = std_dev;
    }

    ld mean() {
        return m1;
    }

    ld standard_deviation() {
        return m2;
    }

private:
    vec_iter_ld start;
    vec_iter_ld end;
    ld m1;
    ld m2;
};

/**
 * This is the implementation of the Smoothed Z-Score Algorithm.
 * This is direction translation of https://stackoverflow.com/a/22640362/1461896.
 *
 * @param input - input signal
 * @param lag - the lag of the moving window
 * @param threshold - the z-score at which the algorithm signals
 * @param influence - the influence (between 0 and 1) of new signals on the mean and standard deviation
 * @return a hashmap containing the filtered signal and corresponding mean and standard deviation.
 */
unordered_map<string, vector<ld>> z_score_thresholding(vector<ld> input, int lag, ld threshold, ld influence) {
    unordered_map<string, vector<ld>> output;

    uint n = (uint) input.size();
    vector<ld> signals(input.size());
    vector<ld> filtered_input(input.begin(), input.end());
    vector<ld> filtered_mean(input.size());
    vector<ld> filtered_stddev(input.size());

    VectorStats lag_subvector_stats(input.begin(), input.begin() + lag);
    filtered_mean[lag - 1] = lag_subvector_stats.mean();
    filtered_stddev[lag - 1] = lag_subvector_stats.standard_deviation();

    for (int i = lag; i < n; i++) {
        if (abs(input[i] - filtered_mean[i - 1]) > threshold * filtered_stddev[i - 1]) {
            signals[i] = (input[i] > filtered_mean[i - 1]) ? 1.0 : -1.0;
            filtered_input[i] = influence * input[i] + (1 - influence) * filtered_input[i - 1];
        } else {
            signals[i] = 0.0;
            filtered_input[i] = input[i];
        }
        VectorStats lag_subvector_stats(filtered_input.begin() + (i - lag), filtered_input.begin() + i);
        filtered_mean[i] = lag_subvector_stats.mean();
        filtered_stddev[i] = lag_subvector_stats.standard_deviation();
    }

    output["signals"] = signals;
    output["filtered_mean"] = filtered_mean;
    output["filtered_stddev"] = filtered_stddev;

    return output;
};

int main() {
    vector<ld> input = {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0,
                        1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9, 1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0,
                        1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0, 3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0,
                        1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0, 1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

    int lag = 30;
    ld threshold = 5.0;
    ld influence = 0.0;
    unordered_map<string, vector<ld>> output = z_score_thresholding(input, lag, threshold, influence);
    cout << output["signals"] << endl;
}

一种方法是根据以下观察来检测峰:

时间t是一个峰值(y (t) > y (t - 1)) & & ((t) > y (t + 1))

它通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

峰如果y (y (t) - (t-dt) > m) && (y (t) - y (t + dt) > m)

dt和m是控制灵敏度和延时的参数

这是你用上述算法得到的结果:

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置m = 0.5,你可以得到一个更清晰的信号,只有一个假阳性: