更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

下面是@Jean-Paul为Arduino微控制器设计的平滑z分数的C语言实现,用于获取加速度计读数,并判断撞击的方向是来自左边还是右边。这表现得非常好,因为这个设备返回一个反弹信号。这是设备对峰值检测算法的输入-显示了来自右边的冲击,然后是来自左边的冲击。你可以看到最初的峰值然后传感器的振荡。

#include <stdio.h>
#include <math.h>
#include <string.h>


#define SAMPLE_LENGTH 1000

float stddev(float data[], int len);
float mean(float data[], int len);
void thresholding(float y[], int signals[], int lag, float threshold, float influence);


void thresholding(float y[], int signals[], int lag, float threshold, float influence) {
    memset(signals, 0, sizeof(int) * SAMPLE_LENGTH);
    float filteredY[SAMPLE_LENGTH];
    memcpy(filteredY, y, sizeof(float) * SAMPLE_LENGTH);
    float avgFilter[SAMPLE_LENGTH];
    float stdFilter[SAMPLE_LENGTH];

    avgFilter[lag - 1] = mean(y, lag);
    stdFilter[lag - 1] = stddev(y, lag);

    for (int i = lag; i < SAMPLE_LENGTH; i++) {
        if (fabsf(y[i] - avgFilter[i-1]) > threshold * stdFilter[i-1]) {
            if (y[i] > avgFilter[i-1]) {
                signals[i] = 1;
            } else {
                signals[i] = -1;
            }
            filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1];
        } else {
            signals[i] = 0;
        }
        avgFilter[i] = mean(filteredY + i-lag, lag);
        stdFilter[i] = stddev(filteredY + i-lag, lag);
    }
}

float mean(float data[], int len) {
    float sum = 0.0, mean = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        sum += data[i];
    }

    mean = sum/len;
    return mean;


}

float stddev(float data[], int len) {
    float the_mean = mean(data, len);
    float standardDeviation = 0.0;

    int i;
    for(i=0; i<len; ++i) {
        standardDeviation += pow(data[i] - the_mean, 2);
    }

    return sqrt(standardDeviation/len);
}

int main() {
    printf("Hello, World!\n");
    int lag = 100;
    float threshold = 5;
    float influence = 0;
    float y[]=  {1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
  ....
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1}

    int signal[SAMPLE_LENGTH];

    thresholding(y, signal,  lag, threshold, influence);

    return 0;
}

她的结果是影响= 0

不是很好,但这里的影响力= 1

这很好。

其他回答

下面是在Golang中实现的Smoothed z-score算法(上图)。它假设一个[]int16 (PCM 16bit样本)的切片。你可以在这里找到要点。

/*
Settings (the ones below are examples: choose what is best for your data)
set lag to 5;          # lag 5 for the smoothing functions
set threshold to 3.5;  # 3.5 standard deviations for signal
set influence to 0.5;  # between 0 and 1, where 1 is normal influence, 0.5 is half
*/

// ZScore on 16bit WAV samples
func ZScore(samples []int16, lag int, threshold float64, influence float64) (signals []int16) {
    //lag := 20
    //threshold := 3.5
    //influence := 0.5

    signals = make([]int16, len(samples))
    filteredY := make([]int16, len(samples))
    for i, sample := range samples[0:lag] {
        filteredY[i] = sample
    }
    avgFilter := make([]int16, len(samples))
    stdFilter := make([]int16, len(samples))

    avgFilter[lag] = Average(samples[0:lag])
    stdFilter[lag] = Std(samples[0:lag])

    for i := lag + 1; i < len(samples); i++ {

        f := float64(samples[i])

        if float64(Abs(samples[i]-avgFilter[i-1])) > threshold*float64(stdFilter[i-1]) {
            if samples[i] > avgFilter[i-1] {
                signals[i] = 1
            } else {
                signals[i] = -1
            }
            filteredY[i] = int16(influence*f + (1-influence)*float64(filteredY[i-1]))
            avgFilter[i] = Average(filteredY[(i - lag):i])
            stdFilter[i] = Std(filteredY[(i - lag):i])
        } else {
            signals[i] = 0
            filteredY[i] = samples[i]
            avgFilter[i] = Average(filteredY[(i - lag):i])
            stdFilter[i] = Std(filteredY[(i - lag):i])
        }
    }

    return
}

// Average a chunk of values
func Average(chunk []int16) (avg int16) {
    var sum int64
    for _, sample := range chunk {
        if sample < 0 {
            sample *= -1
        }
        sum += int64(sample)
    }
    return int16(sum / int64(len(chunk)))
}

@Jean-Paul Smoothed Z Score算法的Dart版本:

class SmoothedZScore {
  int lag = 5;
  num threshold = 10;
  num influence = 0.5;

  num sum(List<num> a) {
    num s = 0;
    for (int i = 0; i < a.length; i++) s += a[i];
    return s;
  }

  num mean(List<num> a) {
    return sum(a) / a.length;
  }

  num stddev(List<num> arr) {
    num arrMean = mean(arr);
    num dev = 0;
    for (int i = 0; i < arr.length; i++) dev += (arr[i] - arrMean) * (arr[i] - arrMean);
    return sqrt(dev / arr.length);
  }

  List<int> smoothedZScore(List<num> y) {
    if (y.length < lag + 2) {
      throw 'y data array too short($y.length) for given lag of $lag';
    }

    // init variables
    List<int> signals = List.filled(y.length, 0);
    List<num> filteredY = List<num>.from(y);
    List<num> leadIn = y.sublist(0, lag);

    var avgFilter = List<num>.filled(y.length, 0);
    var stdFilter = List<num>.filled(y.length, 0);
    avgFilter[lag - 1] = mean(leadIn);
    stdFilter[lag - 1] = stddev(leadIn);

    for (var i = lag; i < y.length; i++) {
      if ((y[i] - avgFilter[i - 1]).abs() > (threshold * stdFilter[i - 1])) {
        signals[i] = y[i] > avgFilter[i - 1] ? 1 : -1;
        // make influence lower
        filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i - 1];
      } else {
        signals[i] = 0; // no signal
        filteredY[i] = y[i];
      }

      // adjust the filters
      List<num> yLag = filteredY.sublist(i - lag, i);
      avgFilter[i] = mean(yLag);
      stdFilter[i] = stddev(yLag);
    }

    return signals;
  }
}

下面是ZSCORE算法的PHP实现:

<?php
$y = array(1,7,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,10,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1);

function mean($data, $start, $len) {
    $avg = 0;
    for ($i = $start; $i < $start+ $len; $i ++)
        $avg += $data[$i];
    return $avg / $len;
}
    
function stddev($data, $start,$len) {
    $mean = mean($data,$start,$len);
    $dev = 0;
    for ($i = $start; $i < $start+$len; $i++) 
        $dev += (($data[$i] - $mean) * ($data[$i] - $mean));
    return sqrt($dev / $len);
}

function zscore($data, $len, $lag= 20, $threshold = 1, $influence = 1) {

    $signals = array();
    $avgFilter = array();
    $stdFilter = array();
    $filteredY = array();
    $avgFilter[$lag - 1] = mean($data, 0, $lag);
    $stdFilter[$lag - 1] = stddev($data, 0, $lag);
    
    for ($i = 0; $i < $len; $i++) {
        $filteredY[$i] = $data[$i];
        $signals[$i] = 0;
    }


    for ($i=$lag; $i < $len; $i++) {
        if (abs($data[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$lag - 1]) {
            if ($data[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            }
            else {
                $signals[$i] = -1;
            }
            $filteredY[$i] = $influence * $data[$i] + (1 - $influence) * $filteredY[$i-1];
        } 
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $data[$i];
        }
        
        $avgFilter[$i] = mean($filteredY, $i - $lag, $lag);
        $stdFilter[$i] = stddev($filteredY, $i - $lag, $lag);
    }
    return $signals;
}

$sig = zscore($y, count($y));

print_r($y); echo "<br><br>";
print_r($sig); echo "<br><br>";

for ($i = 0; $i < count($y); $i++) echo $i. " " . $y[$i]. " ". $sig[$i]."<br>";

这是一个Python实现的鲁棒峰值检测算法算法。

初始化和计算部分被分开,只有filtered_y数组被保留,它的最大大小等于延迟,因此内存没有增加。(结果与上述答案相同)。 为了绘制图形,还保留了标签数组。

我做了一个github要点。

import numpy as np
import pylab

def init(x, lag, threshold, influence):
    '''
    Smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''

    labels = np.zeros(lag)
    filtered_y = np.array(x[0:lag])
    avg_filter = np.zeros(lag)
    std_filter = np.zeros(lag)
    var_filter = np.zeros(lag)

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])

    return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
                std=std_filter[lag - 1], filtered_y=filtered_y,
                labels=labels)


def add(result, single_value, lag, threshold, influence):
    previous_avg = result['avg']
    previous_var = result['var']
    previous_std = result['std']
    filtered_y = result['filtered_y']
    labels = result['labels']

    if abs(single_value - previous_avg) > threshold * previous_std:
        if single_value > previous_avg:
            labels = np.append(labels, 1)
        else:
            labels = np.append(labels, -1)

        # calculate the new filtered element using the influence factor
        filtered_y = np.append(filtered_y, influence * single_value
                               + (1 - influence) * filtered_y[-1])
    else:
        labels = np.append(labels, 0)
        filtered_y = np.append(filtered_y, single_value)

    # update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
    current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
            - filtered_y[len(filtered_y) - lag - 1])

    # update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
    current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
            - previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
            - lag] - previous_avg) ** 2 - (filtered_y[-1]
            - filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag)  # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....

    # calculate standard deviation for current element as sqrt (current variance)
    current_std_filter = np.sqrt(current_var_filter)

    return dict(avg=current_avg_filter, var=current_var_filter,
                std=current_std_filter, filtered_y=filtered_y[1:],
                labels=labels)

lag = 30
threshold = 5
influence = 0

y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)

i = open('quartz2', 'r')
for i in y[lag:]:
    result = add(result, i, lag, threshold, influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
           lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

根据@Jean-Paul提出的解决方案,我用c#实现了他的算法

public class ZScoreOutput
{
    public List<double> input;
    public List<int> signals;
    public List<double> avgFilter;
    public List<double> filtered_stddev;
}

public static class ZScore
{
    public static ZScoreOutput StartAlgo(List<double> input, int lag, double threshold, double influence)
    {
        // init variables!
        int[] signals = new int[input.Count];
        double[] filteredY = new List<double>(input).ToArray();
        double[] avgFilter = new double[input.Count];
        double[] stdFilter = new double[input.Count];

        var initialWindow = new List<double>(filteredY).Skip(0).Take(lag).ToList();

        avgFilter[lag - 1] = Mean(initialWindow);
        stdFilter[lag - 1] = StdDev(initialWindow);

        for (int i = lag; i < input.Count; i++)
        {
            if (Math.Abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
            {
                signals[i] = (input[i] > avgFilter[i - 1]) ? 1 : -1;
                filteredY[i] = influence * input[i] + (1 - influence) * filteredY[i - 1];
            }
            else
            {
                signals[i] = 0;
                filteredY[i] = input[i];
            }

            // Update rolling average and deviation
            var slidingWindow = new List<double>(filteredY).Skip(i - lag).Take(lag+1).ToList();

            var tmpMean = Mean(slidingWindow);
            var tmpStdDev = StdDev(slidingWindow);

            avgFilter[i] = Mean(slidingWindow);
            stdFilter[i] = StdDev(slidingWindow);
        }

        // Copy to convenience class 
        var result = new ZScoreOutput();
        result.input = input;
        result.avgFilter       = new List<double>(avgFilter);
        result.signals         = new List<int>(signals);
        result.filtered_stddev = new List<double>(stdFilter);

        return result;
    }

    private static double Mean(List<double> list)
    {
        // Simple helper function! 
        return list.Average();
    }

    private static double StdDev(List<double> values)
    {
        double ret = 0;
        if (values.Count() > 0)
        {
            double avg = values.Average();
            double sum = values.Sum(d => Math.Pow(d - avg, 2));
            ret = Math.Sqrt((sum) / (values.Count() - 1));
        }
        return ret;
    }
}

使用示例:

var input = new List<double> {1.0, 1.0, 1.1, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 0.9, 1.0,
    1.1, 1.0, 1.0, 0.9, 1.0, 1.0, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 0.9, 1.0, 1.1, 1.0, 1.0, 0.9,
    1.0, 1.1, 1.0, 1.0, 1.1, 1.0, 0.8, 0.9, 1.0, 1.2, 0.9, 1.0, 1.0, 1.1, 1.2, 1.0, 1.5, 1.0,
    3.0, 2.0, 5.0, 3.0, 2.0, 1.0, 1.0, 1.0, 0.9, 1.0, 1.0, 3.0, 2.6, 4.0, 3.0, 3.2, 2.0, 1.0,
    1.0, 0.8, 4.0, 4.0, 2.0, 2.5, 1.0, 1.0, 1.0};

int lag = 30;
double threshold = 5.0;
double influence = 0.0;

var output = ZScore.StartAlgo(input, lag, threshold, influence);