更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

这种z-scores方法在峰值检测方面非常有效,也有助于异常值的去除。异常值对话经常讨论每个点的统计价值和变化数据的伦理。

但是,在来自易出错的串行通信或易出错的传感器的重复错误传感器值的情况下,错误或虚假读数中没有统计值。它们需要被识别并移除。

从视觉上看,错误是显而易见的。下图中的直线显示了需要删除的内容。但是用算法识别和消除错误是相当具有挑战性的。z分数效果很好。

下图是通过串行通信从传感器获得的值。偶尔的串行通信错误,传感器错误或两者都导致重复的,明显错误的数据点。

z-score峰值检测器能够在虚假数据点上发出信号,并生成一个干净的结果数据集,同时保留正确数据的特征:

其他回答

假设你的数据来自传感器(所以算法不可能知道未来的任何事情),

我做了这个算法,它与我在自己的项目中获得的数据非常好。

该算法有2个参数:灵敏度和窗口。

最后,只需一行代码就可以得到你的结果:

detected=data.map((a, b, c) => (a > 0) ? c[b] ** 4 * c[b - 1] ** 3 : -0).map((a, b, c) => a > Math.max(...c.slice(2)) / sensitivity).map((a, b, c) => (b > dwindow) && c.slice(b - dwindow, b).indexOf(a) == -1);

因为我是程序员而不是数学家,所以我不能更好地解释它。但我相信有人可以。

sensitivity = 20; dwindow = 4; data = [1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9, 1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3., 2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3., 2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1. ]; //data = data.concat(data); //data = data.concat(data); var data1 = [{ name: 'original source', y: data }]; Plotly.newPlot('stage1', data1, { title: 'Sensor data', yaxis: { title: 'signal' } }); filtered = data.map((a, b, c) => (a > 0) ? c[b] ** 4 * c[b - 1] ** 3 : -0); var data2 = [{ name: 'filtered source', y: filtered }]; Plotly.newPlot('stage2', data2, { title: 'Filtered data<br>aₙ = aₙ⁴ * aₙ₋₁³', yaxis: { title: 'signal' } }); dwindow = 6; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / sensitivity).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data3 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage3', data3, { title: 'Window 6', yaxis: { title: 'signal' } }); dwindow = 10; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / 20).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data4 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage4', data4, { title: 'Window 10', yaxis: { title: 'signal' } }); <script src="https://cdn.jsdelivr.net/npm/plotly.js@2.16.5/dist/plotly.min.js"></script> <div id="stage1"></div> <div id="stage2"></div> <div id="stage3"></div> <div id="stage4"></div>

一个python/numpy的迭代版本的答案https://stackoverflow.com/a/22640362/6029703在这里。对于大数据(100000+),此代码比计算平均和标准偏差的速度更快。

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np
    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return dict(signals=labels,
                avgFilter=avg_filter,
                stdFilter=std_filter)

用现代c++实现的面向对象版z-score算法

template<typename T>
class FindPeaks{
private:
    std::vector<T> m_input_signal;                      // stores input vector
    std::vector<T> m_array_peak_positive;               
    std::vector<T> m_array_peak_negative;               

public:
    FindPeaks(const std::vector<T>& t_input_signal): m_input_signal{t_input_signal}{ }

    void estimate(){
        int lag{5};
        T threshold{ 5 };                                                                                       // set a threshold
        T influence{ 0.5 };                                                                                    // value between 0 to 1, 1 is normal influence and 0.5 is half the influence

        std::vector<T> filtered_signal(m_input_signal.size(), 0.0);                                             // placeholdered for smooth signal, initialie with all zeros
        std::vector<int> signal(m_input_signal.size(), 0);                                                          // vector that stores where the negative and positive located
        std::vector<T> avg_filtered(m_input_signal.size(), 0.0);                                                // moving averages
        std::vector<T> std_filtered(m_input_signal.size(), 0.0);                                                // moving standard deviation

        avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag);                         // pass the iteartor to vector
        std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);

        for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) {                                         // start index frm 
            if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) {     // check if value is above threhold             
                if ((m_input_signal[iLag]) > avg_filtered[iLag - 1]) {
                    signal[iLag] = 1;                                                                               // assign positive signal
                }
                else {
                    signal[iLag] = -1;                                                                                  // assign negative signal
                }
                filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1];        // exponential smoothing
            }
            else {
                signal[iLag] = 0;                                                                                         // no signal
                filtered_signal[iLag] = m_input_signal[iLag];
            }

            avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
            std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);

        }

        for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
            if (signal[iSignal] == 1) {
                m_array_peak_positive.emplace_back(m_input_signal[iSignal]);                                        // store the positive peaks
            }
            else if (signal[iSignal] == -1) {
                m_array_peak_negative.emplace_back(m_input_signal[iSignal]);                                         // store the negative peaks
            }
        }
        printVoltagePeaks(signal, m_input_signal);

    }

    std::pair< std::vector<T>, std::vector<T> > get_peaks()
    {
        return std::make_pair(m_array_peak_negative, m_array_peak_negative);
    }

};


template<typename T1, typename T2 >
void printVoltagePeaks(std::vector<T1>& m_signal, std::vector<T2>& m_input_signal) {
    std::ofstream output_file("./voltage_peak.csv");
    std::ostream_iterator<T2> output_iterator_voltage(output_file, ",");
    std::ostream_iterator<T1> output_iterator_signal(output_file, ",");
    std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
    output_file << "\n";
    std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);
}

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findMean(iterator_type it, iterator_type end)
{
    /* function that receives iterator to*/
    typename std::iterator_traits<iterator_type>::value_type sum{ 0.0 };
    int counter = 0;
    while (it != end) {
        sum += *(it++);
        counter++;
    }
    return sum / counter;
}

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findStandardDeviation(iterator_type it, iterator_type end)
{
    auto mean = findMean(it, end);
    typename std::iterator_traits<iterator_type>::value_type sum_squared_error{ 0.0 };
    int counter{ 0 };
    while (it != end) {
        sum_squared_error += std::pow((*(it++) - mean), 2);
        counter++;
    }
    auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
    return standard_deviation;
}

@Jean-Paul Smoothed Z Score算法的Dart版本:

class SmoothedZScore {
  int lag = 5;
  num threshold = 10;
  num influence = 0.5;

  num sum(List<num> a) {
    num s = 0;
    for (int i = 0; i < a.length; i++) s += a[i];
    return s;
  }

  num mean(List<num> a) {
    return sum(a) / a.length;
  }

  num stddev(List<num> arr) {
    num arrMean = mean(arr);
    num dev = 0;
    for (int i = 0; i < arr.length; i++) dev += (arr[i] - arrMean) * (arr[i] - arrMean);
    return sqrt(dev / arr.length);
  }

  List<int> smoothedZScore(List<num> y) {
    if (y.length < lag + 2) {
      throw 'y data array too short($y.length) for given lag of $lag';
    }

    // init variables
    List<int> signals = List.filled(y.length, 0);
    List<num> filteredY = List<num>.from(y);
    List<num> leadIn = y.sublist(0, lag);

    var avgFilter = List<num>.filled(y.length, 0);
    var stdFilter = List<num>.filled(y.length, 0);
    avgFilter[lag - 1] = mean(leadIn);
    stdFilter[lag - 1] = stddev(leadIn);

    for (var i = lag; i < y.length; i++) {
      if ((y[i] - avgFilter[i - 1]).abs() > (threshold * stdFilter[i - 1])) {
        signals[i] = y[i] > avgFilter[i - 1] ? 1 : -1;
        // make influence lower
        filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1];
      } else {
        signals[i] = 0; // no signal
        filteredY[i] = y[i];
      }

      // adjust the filters
      List<num> yLag = filteredY.sublist(i - lag, i);
      avgFilter[i] = mean(yLag);
      stdFilter[i] = stddev(yLag);
    }

    return signals;
  }
}

我们尝试在我们的数据集上使用平滑的z-score算法,这导致了过度敏感或不敏感(取决于参数如何调整),几乎没有中间地带。在我们站点的交通信号中,我们观察到一个低频基线,它代表了每天的周期,即使有最好的可能参数(如下所示),它仍然在第4天下降,特别是因为大多数数据点被认为是异常的。

在原始z-score算法的基础上,我们提出了一种通过反向滤波来解决这个问题的方法。改进后的算法及其在电视商业流量归因中的应用详见我们的团队博客。