更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

一个python/numpy的迭代版本的答案https://stackoverflow.com/a/22640362/6029703在这里。对于大数据(100000+),此代码比计算平均和标准偏差的速度更快。

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np
    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return dict(signals=labels,
                avgFilter=avg_filter,
                stdFilter=std_filter)

其他回答

下面是在Golang中实现的Smoothed z-score算法(上图)。它假设一个[]int16 (PCM 16bit样本)的切片。你可以在这里找到要点。

/*
Settings (the ones below are examples: choose what is best for your data)
set lag to 5;          # lag 5 for the smoothing functions
set threshold to 3.5;  # 3.5 standard deviations for signal
set influence to 0.5;  # between 0 and 1, where 1 is normal influence, 0.5 is half
*/

// ZScore on 16bit WAV samples
func ZScore(samples []int16, lag int, threshold float64, influence float64) (signals []int16) {
    //lag := 20
    //threshold := 3.5
    //influence := 0.5

    signals = make([]int16, len(samples))
    filteredY := make([]int16, len(samples))
    for i, sample := range samples[0:lag] {
        filteredY[i] = sample
    }
    avgFilter := make([]int16, len(samples))
    stdFilter := make([]int16, len(samples))

    avgFilter[lag] = Average(samples[0:lag])
    stdFilter[lag] = Std(samples[0:lag])

    for i := lag + 1; i < len(samples); i++ {

        f := float64(samples[i])

        if float64(Abs(samples[i]-avgFilter[i-1])) > threshold*float64(stdFilter[i-1]) {
            if samples[i] > avgFilter[i-1] {
                signals[i] = 1
            } else {
                signals[i] = -1
            }
            filteredY[i] = int16(influence*f + (1-influence)*float64(filteredY[i-1]))
            avgFilter[i] = Average(filteredY[(i - lag):i])
            stdFilter[i] = Std(filteredY[(i - lag):i])
        } else {
            signals[i] = 0
            filteredY[i] = samples[i]
            avgFilter[i] = Average(filteredY[(i - lag):i])
            stdFilter[i] = Std(filteredY[(i - lag):i])
        }
    }

    return
}

// Average a chunk of values
func Average(chunk []int16) (avg int16) {
    var sum int64
    for _, sample := range chunk {
        if sample < 0 {
            sample *= -1
        }
        sum += int64(sample)
    }
    return int16(sum / int64(len(chunk)))
}

另外,这个算法对我来说也很好…

sensitivity = 4; dwindow = 4; k = dwindow; data = [1., 1., 1., 1., 1., 1., 1., 1.1, 1., 0.8, 0.9, 1., 1.2, 0.9, 1., 1., 1.1, 1.2, 1., 1.5, 1., 3., 2., 5., 3., 2., 1., 1., 1., 0.9, 1., 1., 3., 2.6, 4., 3., 3.2, 2., 1., 1., 1., 1., 1. ]; //data = data.concat(data); //data = data.concat(data); var data1 = [{ name: 'original source', y: data }]; Plotly.newPlot('stage1', data1, { title: 'Sensor data', yaxis: { title: 'signal' } }); filtered = data.map((a,b,c)=>a>=Math.max(...c.slice(b-k,b))?a**3:0); var data2 = [{ name: 'filtered source', y: filtered }]; Plotly.newPlot('stage2', data2, { title: 'Filtered data<br>aₙ = aₙ³', yaxis: { title: 'signal' } }); dwindow = 6; k = dwindow; detected = filtered.map((a,b,c)=>a>Math.max(...c.slice(2))/sensitivity).map((a,b,c)=>(b>k) && c.slice(b-k,b).indexOf(a)==-1 ); var data3 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage3', data3, { title: 'Maximum in a window of 6', yaxis: { title: 'signal' } }); dwindow = 10; k = dwindow; detected = filtered.map((a, b, c) => a > Math.max(...c.slice(2)) / 20).map((a, b, c) => (b > k) && c.slice(b - k, b).indexOf(a) == -1) var data4 = [{ name: 'detected peaks', y: detected }]; Plotly.newPlot('stage4', data4, { title: 'Maximum in a window of 10', yaxis: { title: 'signal' } }); <script src="https://cdn.jsdelivr.net/npm/plotly.js@2.16.5/dist/plotly.min.js"></script> <div id="stage1"></div> <div id="stage2"></div> <div id="stage3"></div> <div id="stage4"></div>

这是一个Python实现的鲁棒峰值检测算法算法。

初始化和计算部分被分开,只有filtered_y数组被保留,它的最大大小等于延迟,因此内存没有增加。(结果与上述答案相同)。 为了绘制图形,还保留了标签数组。

我做了一个github要点。

import numpy as np
import pylab

def init(x, lag, threshold, influence):
    '''
    Smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''

    labels = np.zeros(lag)
    filtered_y = np.array(x[0:lag])
    avg_filter = np.zeros(lag)
    std_filter = np.zeros(lag)
    var_filter = np.zeros(lag)

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])

    return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
                std=std_filter[lag - 1], filtered_y=filtered_y,
                labels=labels)


def add(result, single_value, lag, threshold, influence):
    previous_avg = result['avg']
    previous_var = result['var']
    previous_std = result['std']
    filtered_y = result['filtered_y']
    labels = result['labels']

    if abs(single_value - previous_avg) > threshold * previous_std:
        if single_value > previous_avg:
            labels = np.append(labels, 1)
        else:
            labels = np.append(labels, -1)

        # calculate the new filtered element using the influence factor
        filtered_y = np.append(filtered_y, influence * single_value
                               + (1 - influence) * filtered_y[-1])
    else:
        labels = np.append(labels, 0)
        filtered_y = np.append(filtered_y, single_value)

    # update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
    current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
            - filtered_y[len(filtered_y) - lag - 1])

    # update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
    current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
            - previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
            - lag] - previous_avg) ** 2 - (filtered_y[-1]
            - filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag)  # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....

    # calculate standard deviation for current element as sqrt (current variance)
    current_std_filter = np.sqrt(current_var_filter)

    return dict(avg=current_avg_filter, var=current_var_filter,
                std=current_std_filter, filtered_y=filtered_y[1:],
                labels=labels)

lag = 30
threshold = 5
influence = 0

y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)

i = open('quartz2', 'r')
for i in y[lag:]:
    result = add(result, i, lag, threshold, influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
           lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()

@Jean-Paul算法的Perl实现。

#!/usr/bin/perl

use strict;
use Data::Dumper;

sub mean {
    my $data = shift;
    my $sum = 0;
    my $mean_val = 0;
    for my $item (@$data) {
        $sum += $item;
    }
    $mean_val = $sum / (scalar @$data) if @$data;
    return $mean_val;
}

sub variance {
    my $data = shift;
    my $variance_val = 0;
    my $mean_val = mean($data);
    my $sum = 0;
    for my $item (@$data) {
        $sum += ($item - $mean_val)**2;
    }
    $variance_val = $sum / (scalar @$data) if @$data;
    return $variance_val;
}

sub std {
    my $data = shift;
    my $variance_val = variance($data);
    return sqrt($variance_val);
}

# @param y - The input vector to analyze
# @parameter lag - The lag of the moving window
# @parameter threshold - The z-score at which the algorithm signals
# @parameter influence - The influence (between 0 and 1) of new signals on the mean and standard deviation
sub thresholding_algo {
    my ($y, $lag, $threshold, $influence) = @_;

    my @signals = (0) x @$y;
    my @filteredY = @$y;
    my @avgFilter = (0) x @$y;
    my @stdFilter = (0) x @$y;

    $avgFilter[$lag - 1] = mean([@$y[0..$lag-1]]);
    $stdFilter[$lag - 1] = std([@$y[0..$lag-1]]);

    for (my $i=$lag; $i <= @$y - 1; $i++) {
        if (abs($y->[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$i-1]) {
            if ($y->[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            } else {
                $signals[$i] = -1;
            }

            $filteredY[$i] = $influence * $y->[$i] + (1 - $influence) * $filteredY[$i-1];
            $avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
            $stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
        }
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $y->[$i];
            $avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
            $stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
        }
    }

    return {
        signals => \@signals,
        avgFilter => \@avgFilter,
        stdFilter => \@stdFilter
    };
}

my $y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1];

my $lag = 30;
my $threshold = 5;
my $influence = 0;

my $result = thresholding_algo($y, $lag, $threshold, $influence);

print Dumper $result;

这是一个修改后的Fortran版本的z-score算法。 它是专门针对频率空间中传递函数的峰值(共振)检测进行修改的(每个更改在代码中都有一个小注释)。

如果在输入向量的下界附近存在共振,则第一个修改会向用户发出警告,该共振由高于某个阈值的标准偏差表示(在本例中为10%)。这仅仅意味着信号不够平坦,不足以使检测正确地初始化滤波器。

第二种修改是只将峰值的最大值添加到已找到的峰值中。这是通过将每个发现的峰值与其(滞后)前辈及其(滞后)后继者的大小进行比较来达到的。

第三个变化是尊重共振峰通常在共振频率周围表现出某种形式的对称性。因此,围绕当前数据点对称地计算平均值和std是很自然的(而不仅仅是针对之前的数据点)。这将导致更好的峰值检测行为。

这些修改的效果是,整个信号必须事先被函数知道,这是共振检测的通常情况(类似于Jean-Paul的Matlab示例,其中数据点是动态生成的,这是行不通的)。

function PeakDetect(y,lag,threshold, influence)
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer, dimension(size(y)) :: PeakDetect
    real, dimension(size(y)) :: filteredY, avgFilter, stdFilter
    integer :: lag, ii
    real :: threshold, influence

    ! Executing part
    PeakDetect = 0
    filteredY = 0.0
    filteredY(1:lag+1) = y(1:lag+1)
    avgFilter = 0.0
    avgFilter(lag+1) = mean(y(1:2*lag+1))
    stdFilter = 0.0
    stdFilter(lag+1) = std(y(1:2*lag+1))

    if (stdFilter(lag+1)/avgFilter(lag+1)>0.1) then ! If the coefficient of variation exceeds 10%, the signal is too uneven at the start, possibly because of a peak.
        write(unit=*,fmt=1001)
1001        format(1X,'Warning: Peak detection might have failed, as there may be a peak at the edge of the frequency range.',/)
    end if
    do ii = lag+2, size(y)
        if (abs(y(ii) - avgFilter(ii-1)) > threshold * stdFilter(ii-1)) then
            ! Find only the largest outstanding value which is only the one greater than its predecessor and its successor
            if (y(ii) > avgFilter(ii-1) .AND. y(ii) > y(ii-1) .AND. y(ii) > y(ii+1)) then
                PeakDetect(ii) = 1
            end if
            filteredY(ii) = influence * y(ii) + (1 - influence) * filteredY(ii-1)
        else
            filteredY(ii) = y(ii)
        end if
        ! Modified with respect to the original code. Mean and standard deviation are calculted symmetrically around the current point
        avgFilter(ii) = mean(filteredY(ii-lag:ii+lag))
        stdFilter(ii) = std(filteredY(ii-lag:ii+lag))
    end do
end function PeakDetect

real function mean(y)
    !> @brief Calculates the mean of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    mean = sum(y)/N
end function mean

real function std(y)
    !> @brief Calculates the standard deviation of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    std = sqrt((N*dot_product(y,y) - sum(y)**2) / (N*(N-1)))
end function std

对于我的应用程序,算法的工作就像一个魅力!