更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

使用实时流的Python版本(不会在每个新数据点到达时重新计算所有数据点)。您可能想要调整类函数返回的内容—对于我的目的,我只需要信号。

import numpy as np


class real_time_peak_detection():
    def __init__(self, array, lag, threshold, influence):
        self.y = list(array)
        self.length = len(self.y)
        self.lag = lag
        self.threshold = threshold
        self.influence = influence
        self.signals = [0] * len(self.y)
        self.filteredY = np.array(self.y).tolist()
        self.avgFilter = [0] * len(self.y)
        self.stdFilter = [0] * len(self.y)
        self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
        self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()

    def thresholding_algo(self, new_value):
        self.y.append(new_value)
        i = len(self.y) - 1
        self.length = len(self.y)
        if i < self.lag:
            return 0
        elif i == self.lag:
            self.signals = [0] * len(self.y)
            self.filteredY = np.array(self.y).tolist()
            self.avgFilter = [0] * len(self.y)
            self.stdFilter = [0] * len(self.y)
            self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
            self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
            return 0

        self.signals += [0]
        self.filteredY += [0]
        self.avgFilter += [0]
        self.stdFilter += [0]

        if abs(self.y[i] - self.avgFilter[i - 1]) > (self.threshold * self.stdFilter[i - 1]):

            if self.y[i] > self.avgFilter[i - 1]:
                self.signals[i] = 1
            else:
                self.signals[i] = -1

            self.filteredY[i] = self.influence * self.y[i] + \
                (1 - self.influence) * self.filteredY[i - 1]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
        else:
            self.signals[i] = 0
            self.filteredY[i] = self.y[i]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])

        return self.signals[i]

其他回答

我为Jean-Paul最受欢迎的答案写了一个Go包。它假设y值的类型为float64。

github.com/MicahParks/peakdetect

下面的示例使用了这个包,并基于上面提到的流行答案中的R示例。它在编译时没有任何依赖关系,试图保持较低的内存占用,并且在有新数据点进入时不重新处理过去的点。该项目有100%的测试覆盖率,主要来自上述R示例的输入和输出。但是,如果有人发现任何错误,请打开一个GitHub问题。

编辑:我对v0.0.5进行了性能改进,似乎快了10倍!它使用Welford的方法进行初始化,并使用类似的方法计算滞后期(滑动窗口)的平均值和总体标准偏差。特别感谢另一个帖子的回答:https://stackoverflow.com/a/14638138/14797322

下面是基于R例子的Golang例子:

package main

import (
    "fmt"
    "log"

    "github.com/MicahParks/peakdetect"
)

// This example is the equivalent of the R example from the algorithm's author.
// https://stackoverflow.com/a/54507329/14797322
func main() {
    data := []float64{1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1}

    // Algorithm configuration from example.
    const (
        lag       = 30
        threshold = 5
        influence = 0
    )

    // Create then initialize the peak detector.
    detector := peakdetect.NewPeakDetector()
    err := detector.Initialize(influence, threshold, data[:lag]) // The length of the initial values is the lag.
    if err != nil {
        log.Fatalf("Failed to initialize peak detector.\nError: %s", err)
    }

    // Start processing new data points and determine what signal, if any they produce.
    //
    // This method, .Next(), is best for when data is being processed in a stream, but this simply iterates over a slice.
    nextDataPoints := data[lag:]
    for i, newPoint := range nextDataPoints {
        signal := detector.Next(newPoint)
        var signalType string
        switch signal {
        case peakdetect.SignalNegative:
            signalType = "negative"
        case peakdetect.SignalNeutral:
            signalType = "neutral"
        case peakdetect.SignalPositive:
            signalType = "positive"
        }

        println(fmt.Sprintf("Data point at index %d has the signal: %s", i+lag, signalType))
    }

    // This method, .NextBatch(), is a helper function for processing many data points at once. It's returned slice
    // should produce the same signal outputs as the loop above.
    signals := detector.NextBatch(nextDataPoints)
    println(fmt.Sprintf("1:1 ratio of batch inputs to signal outputs: %t", len(signals) == len(nextDataPoints)))
}

下面是我尝试为“Smoothed z-score算法”创建一个Ruby解决方案:

module ThresholdingAlgoMixin
  def mean(array)
    array.reduce(&:+) / array.size.to_f
  end

  def stddev(array)
    array_mean = mean(array)
    Math.sqrt(array.reduce(0.0) { |a, b| a.to_f + ((b.to_f - array_mean) ** 2) } / array.size.to_f)
  end

  def thresholding_algo(lag: 5, threshold: 3.5, influence: 0.5)
    return nil if size < lag * 2
    Array.new(size, 0).tap do |signals|
      filtered = Array.new(self)

      initial_slice = take(lag)
      avg_filter = Array.new(lag - 1, 0.0) + [mean(initial_slice)]
      std_filter = Array.new(lag - 1, 0.0) + [stddev(initial_slice)]
      (lag..size-1).each do |idx|
        prev = idx - 1
        if (fetch(idx) - avg_filter[prev]).abs > threshold * std_filter[prev]
          signals[idx] = fetch(idx) > avg_filter[prev] ? 1 : -1
          filtered[idx] = (influence * fetch(idx)) + ((1-influence) * filtered[prev])
        end

        filtered_slice = filtered[idx-lag..prev]
        avg_filter[idx] = mean(filtered_slice)
        std_filter[idx] = stddev(filtered_slice)
      end
    end
  end
end

以及示例用法:

test_data = [
  1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1,
  1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1,
  1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5,
  1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2,
  1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1
].extend(ThresholdingAlgoMixin)

puts test_data.thresholding_algo.inspect

# Output: [
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
#   1, 1, 0, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0
# ]

c++ (Qt)演示端口,交互式参数

我已经将这个算法的演示应用程序移植到c++ (Qt)上。

代码可以在GitHub上找到这里。带有安装程序的Windows(64位)构建在发布页面上。最后,我将添加一些文档和其他发布版本。

您不能绘制点,但可以从文本文件中导入它们(用空格分隔点——换行也算作空格)。您还可以调整算法参数,实时查看效果。这对于针对特定数据集调整算法以及探索参数如何影响结果非常有用。


上面的截图有些过时;从那以后,我添加了两个原始算法中没有的实验性选项:

反向处理数据集的选项(似乎至少改善了功率谱的结果)。 选项,为峰值设置硬性最小阈值。

我还在窗口中间添加了一个笨拙的缩放/平移条,只需用鼠标拖动它来缩放和平移。

模糊的构建指令:

在发布页面上有一个Windows安装程序(64位),但如果你想从源代码构建它,要点是:

安装Qt的构建工具,然后将qmake && make放在与.pro文件相同的目录下,或者 安装Qt Creator,打开.pro文件,选择任何默认的构建配置,然后按下构建和/或运行按钮(Creator的左下角)。

我只测试过Qt5。我有91%的信心,如果你手动配置组件,Qt Creator安装程序会让你安装Qt5(如果你手动配置组件,你还需要确认是否安装了Qt Charts)。Qt6可能是一个流畅的构建,也可能不是。有一天,我将测试Qt4和Qt6,使这些文档更好。也许吧。

下面是一个基于Groovy回答的实际Java实现。(我知道已经发布了Groovy和Kotlin实现,但对于像我这样只做Java的人来说,弄清楚如何在其他语言和Java之间转换真的很麻烦)。

(结果与他人图表相匹配)

算法实现

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.commons.math3.stat.descriptive.SummaryStatistics;

public class SignalDetector {

    public HashMap<String, List> analyzeDataForSignals(List<Double> data, int lag, Double threshold, Double influence) {

        // init stats instance
        SummaryStatistics stats = new SummaryStatistics();

        // the results (peaks, 1 or -1) of our algorithm
        List<Integer> signals = new ArrayList<Integer>(Collections.nCopies(data.size(), 0));

        // filter out the signals (peaks) from our original list (using influence arg)
        List<Double> filteredData = new ArrayList<Double>(data);

        // the current average of the rolling window
        List<Double> avgFilter = new ArrayList<Double>(Collections.nCopies(data.size(), 0.0d));

        // the current standard deviation of the rolling window
        List<Double> stdFilter = new ArrayList<Double>(Collections.nCopies(data.size(), 0.0d));

        // init avgFilter and stdFilter
        for (int i = 0; i < lag; i++) {
            stats.addValue(data.get(i));
        }
        avgFilter.set(lag - 1, stats.getMean());
        stdFilter.set(lag - 1, Math.sqrt(stats.getPopulationVariance())); // getStandardDeviation() uses sample variance
        stats.clear();

        // loop input starting at end of rolling window
        for (int i = lag; i < data.size(); i++) {

            // if the distance between the current value and average is enough standard deviations (threshold) away
            if (Math.abs((data.get(i) - avgFilter.get(i - 1))) > threshold * stdFilter.get(i - 1)) {

                // this is a signal (i.e. peak), determine if it is a positive or negative signal
                if (data.get(i) > avgFilter.get(i - 1)) {
                    signals.set(i, 1);
                } else {
                    signals.set(i, -1);
                }

                // filter this signal out using influence
                filteredData.set(i, (influence * data.get(i)) + ((1 - influence) * filteredData.get(i - 1)));
            } else {
                // ensure this signal remains a zero
                signals.set(i, 0);
                // ensure this value is not filtered
                filteredData.set(i, data.get(i));
            }

            // update rolling average and deviation
            for (int j = i - lag; j < i; j++) {
                stats.addValue(filteredData.get(j));
            }
            avgFilter.set(i, stats.getMean());
            stdFilter.set(i, Math.sqrt(stats.getPopulationVariance()));
            stats.clear();
        }

        HashMap<String, List> returnMap = new HashMap<String, List>();
        returnMap.put("signals", signals);
        returnMap.put("filteredData", filteredData);
        returnMap.put("avgFilter", avgFilter);
        returnMap.put("stdFilter", stdFilter);

        return returnMap;

    } // end
}

主要方法

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class Main {

    public static void main(String[] args) throws Exception {
        DecimalFormat df = new DecimalFormat("#0.000");

        ArrayList<Double> data = new ArrayList<Double>(Arrays.asList(1d, 1d, 1.1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 0.9d, 1d,
                1.1d, 1d, 1d, 0.9d, 1d, 1d, 1.1d, 1d, 1d, 1d, 1d, 1.1d, 0.9d, 1d, 1.1d, 1d, 1d, 0.9d, 1d, 1.1d, 1d, 1d,
                1.1d, 1d, 0.8d, 0.9d, 1d, 1.2d, 0.9d, 1d, 1d, 1.1d, 1.2d, 1d, 1.5d, 1d, 3d, 2d, 5d, 3d, 2d, 1d, 1d, 1d,
                0.9d, 1d, 1d, 3d, 2.6d, 4d, 3d, 3.2d, 2d, 1d, 1d, 0.8d, 4d, 4d, 2d, 2.5d, 1d, 1d, 1d));

        SignalDetector signalDetector = new SignalDetector();
        int lag = 30;
        double threshold = 5;
        double influence = 0;

        HashMap<String, List> resultsMap = signalDetector.analyzeDataForSignals(data, lag, threshold, influence);
        // print algorithm params
        System.out.println("lag: " + lag + "\t\tthreshold: " + threshold + "\t\tinfluence: " + influence);

        System.out.println("Data size: " + data.size());
        System.out.println("Signals size: " + resultsMap.get("signals").size());

        // print data
        System.out.print("Data:\t\t");
        for (double d : data) {
            System.out.print(df.format(d) + "\t");
        }
        System.out.println();

        // print signals
        System.out.print("Signals:\t");
        List<Integer> signalsList = resultsMap.get("signals");
        for (int i : signalsList) {
            System.out.print(df.format(i) + "\t");
        }
        System.out.println();

        // print filtered data
        System.out.print("Filtered Data:\t");
        List<Double> filteredDataList = resultsMap.get("filteredData");
        for (double d : filteredDataList) {
            System.out.print(df.format(d) + "\t");
        }
        System.out.println();

        // print running average
        System.out.print("Avg Filter:\t");
        List<Double> avgFilterList = resultsMap.get("avgFilter");
        for (double d : avgFilterList) {
            System.out.print(df.format(d) + "\t");
        }
        System.out.println();

        // print running std
        System.out.print("Std filter:\t");
        List<Double> stdFilterList = resultsMap.get("stdFilter");
        for (double d : stdFilterList) {
            System.out.print(df.format(d) + "\t");
        }
        System.out.println();

        System.out.println();
        for (int i = 0; i < signalsList.size(); i++) {
            if (signalsList.get(i) != 0) {
                System.out.println("Point " + i + " gave signal " + signalsList.get(i));
            }
        }
    }
}

结果

lag: 30     threshold: 5.0      influence: 0.0
Data size: 74
Signals size: 74
Data:           1.000   1.000   1.100   1.000   0.900   1.000   1.000   1.100   1.000   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.000   1.100   1.000   1.000   1.000   1.000   1.100   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.100   1.000   1.000   1.100   1.000   0.800   0.900   1.000   1.200   0.900   1.000   1.000   1.100   1.200   1.000   1.500   1.000   3.000   2.000   5.000   3.000   2.000   1.000   1.000   1.000   0.900   1.000   1.000   3.000   2.600   4.000   3.000   3.200   2.000   1.000   1.000   0.800   4.000   4.000   2.000   2.500   1.000   1.000   1.000   
Signals:        0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   1.000   0.000   1.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   0.000   0.000   0.000   1.000   1.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   1.000   1.000   1.000   1.000   0.000   0.000   0.000   
Filtered Data:  1.000   1.000   1.100   1.000   0.900   1.000   1.000   1.100   1.000   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.000   1.100   1.000   1.000   1.000   1.000   1.100   0.900   1.000   1.100   1.000   1.000   0.900   1.000   1.100   1.000   1.000   1.100   1.000   0.800   0.900   1.000   1.200   0.900   1.000   1.000   1.100   1.200   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   0.900   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   1.000   0.800   0.800   0.800   0.800   0.800   1.000   1.000   1.000   
Avg Filter:     0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   1.003   1.003   1.007   1.007   1.003   1.007   1.010   1.003   1.000   0.997   1.003   1.003   1.003   1.000   1.003   1.010   1.013   1.013   1.013   1.010   1.010   1.010   1.010   1.010   1.007   1.010   1.010   1.003   1.003   1.003   1.007   1.007   1.003   1.003   1.003   1.000   1.000   1.007   1.003   0.997   0.983   0.980   0.973   0.973   0.970   
Std filter:     0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.000   0.060   0.060   0.063   0.063   0.060   0.063   0.060   0.071   0.073   0.071   0.080   0.080   0.080   0.077   0.080   0.087   0.085   0.085   0.085   0.083   0.083   0.083   0.083   0.083   0.081   0.079   0.079   0.080   0.080   0.080   0.077   0.077   0.075   0.075   0.075   0.073   0.073   0.063   0.071   0.080   0.078   0.083   0.089   0.089   0.086   

Point 45 gave signal 1
Point 47 gave signal 1
Point 48 gave signal 1
Point 49 gave signal 1
Point 50 gave signal 1
Point 51 gave signal 1
Point 58 gave signal 1
Point 59 gave signal 1
Point 60 gave signal 1
Point 61 gave signal 1
Point 62 gave signal 1
Point 63 gave signal 1
Point 67 gave signal 1
Point 68 gave signal 1
Point 69 gave signal 1
Point 70 gave signal 1

下面是@Jean-Paul为Arduino微控制器设计的平滑z分数的C语言实现,用于获取加速度计读数,并判断撞击的方向是来自左边还是右边。这表现得非常好,因为这个设备返回一个反弹信号。这是设备对峰值检测算法的输入-显示了来自右边的冲击,然后是来自左边的冲击。你可以看到最初的峰值然后传感器的振荡。

#include <stdio.h>
#include <math.h>
#include <string.h>


#define SAMPLE_LENGTH 1000

float stddev(float data[], int len);
float mean(float data[], int len);
void thresholding(float y[], int signals[], int lag, float threshold, float influence);


void thresholding(float y[], int signals[], int lag, float threshold, float influence) {
    memset(signals, 0, sizeof(int) * SAMPLE_LENGTH);
    float filteredY[SAMPLE_LENGTH];
    memcpy(filteredY, y, sizeof(float) * SAMPLE_LENGTH);
    float avgFilter[SAMPLE_LENGTH];
    float stdFilter[SAMPLE_LENGTH];

    avgFilter[lag - 1] = mean(y, lag);
    stdFilter[lag - 1] = stddev(y, lag);

    for (int i = lag; i < SAMPLE_LENGTH; i++) {
        if (fabsf(y[i] - avgFilter[i-1]) > threshold * stdFilter[i-1]) {
            if (y[i] > avgFilter[i-1]) {
                signals[i] = 1;
            } else {
                signals[i] = -1;
            }
            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1];
        } else {
            signals[i] = 0;
        }
        avgFilter[i] = mean(filteredY + i-lag, lag);
        stdFilter[i] = stddev(filteredY + i-lag, lag);
    }
}

float mean(float data[], int len) {
    float sum = 0.0, mean = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        sum += data[i];
    }

    mean = sum/len;
    return mean;


}

float stddev(float data[], int len) {
    float the_mean = mean(data, len);
    float standardDeviation = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        standardDeviation += pow(data[i] - the_mean, 2);
    }

    return sqrt(standardDeviation/len);
}

int main() {
    printf("Hello, World!\n");
    int lag = 100;
    float threshold = 5;
    float influence = 0;
    float y[]=  {1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
  ....
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1}

    int signal[SAMPLE_LENGTH];

    thresholding(y, signal,  lag, threshold, influence);

    return 0;
}

她的结果是影响= 0

不是很好,但这里的影响力= 1

这很好。