更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

如果边界值或其他标准取决于未来值,那么唯一的解决方案(没有时间机器,或其他关于未来值的知识)是推迟任何决定,直到有足够的未来值。如果你想要一个高于均值的水平,例如,20点,那么你必须等到你至少有19点才能做出任何峰值决策,否则下一个新点可能会完全超过你19点之前的阈值。

Added: If the statistical distribution of the peak heights could be heavy tailed, instead of Uniform or Gaussian, then you may need to wait until you see several thousand peaks before it starts to become unlikely that a hidden Pareto distribution won't produce a peak many times larger than any you currently have seen before or have in your current plot. Unless you somehow know in advance that the very next point can't be 1e20, it could appear, which after rescaling your plot's Y dimension, would be flat up until that point.

其他回答

这是一个修改后的Fortran版本的z-score算法。 它是专门针对频率空间中传递函数的峰值(共振)检测进行修改的(每个更改在代码中都有一个小注释)。

如果在输入向量的下界附近存在共振,则第一个修改会向用户发出警告,该共振由高于某个阈值的标准偏差表示(在本例中为10%)。这仅仅意味着信号不够平坦,不足以使检测正确地初始化滤波器。

第二种修改是只将峰值的最大值添加到已找到的峰值中。这是通过将每个发现的峰值与其(滞后)前辈及其(滞后)后继者的大小进行比较来达到的。

第三个变化是尊重共振峰通常在共振频率周围表现出某种形式的对称性。因此,围绕当前数据点对称地计算平均值和std是很自然的(而不仅仅是针对之前的数据点)。这将导致更好的峰值检测行为。

这些修改的效果是,整个信号必须事先被函数知道,这是共振检测的通常情况(类似于Jean-Paul的Matlab示例,其中数据点是动态生成的,这是行不通的)。

function PeakDetect(y,lag,threshold, influence)
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer, dimension(size(y)) :: PeakDetect
    real, dimension(size(y)) :: filteredY, avgFilter, stdFilter
    integer :: lag, ii
    real :: threshold, influence

    ! Executing part
    PeakDetect = 0
    filteredY = 0.0
    filteredY(1:lag+1) = y(1:lag+1)
    avgFilter = 0.0
    avgFilter(lag+1) = mean(y(1:2*lag+1))
    stdFilter = 0.0
    stdFilter(lag+1) = std(y(1:2*lag+1))

    if (stdFilter(lag+1)/avgFilter(lag+1)>0.1) then ! If the coefficient of variation exceeds 10%, the signal is too uneven at the start, possibly because of a peak.
        write(unit=*,fmt=1001)
1001        format(1X,'Warning: Peak detection might have failed, as there may be a peak at the edge of the frequency range.',/)
    end if
    do ii = lag+2, size(y)
        if (abs(y(ii) - avgFilter(ii-1)) > threshold * stdFilter(ii-1)) then
            ! Find only the largest outstanding value which is only the one greater than its predecessor and its successor
            if (y(ii) > avgFilter(ii-1) .AND. y(ii) > y(ii-1) .AND. y(ii) > y(ii+1)) then
                PeakDetect(ii) = 1
            end if
            filteredY(ii) = influence * y(ii) + (1 - influence) * filteredY(ii-1)
        else
            filteredY(ii) = y(ii)
        end if
        ! Modified with respect to the original code. Mean and standard deviation are calculted symmetrically around the current point
        avgFilter(ii) = mean(filteredY(ii-lag:ii+lag))
        stdFilter(ii) = std(filteredY(ii-lag:ii+lag))
    end do
end function PeakDetect

real function mean(y)
    !> @brief Calculates the mean of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    mean = sum(y)/N
end function mean

real function std(y)
    !> @brief Calculates the standard deviation of vector y
    implicit none
    ! Declaring part
    real, dimension(:), intent(in) :: y
    integer :: N
    ! Executing part
    N = max(1,size(y))
    std = sqrt((N*dot_product(y,y) - sum(y)**2) / (N*(N-1)))
end function std

对于我的应用程序,算法的工作就像一个魅力!

下面是ZSCORE算法的PHP实现:

<?php
$y = array(1,7,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,10,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1);

function mean($data, $start, $len) {
    $avg = 0;
    for ($i = $start; $i < $start+ $len; $i ++)
        $avg += $data[$i];
    return $avg / $len;
}
    
function stddev($data, $start,$len) {
    $mean = mean($data,$start,$len);
    $dev = 0;
    for ($i = $start; $i < $start+$len; $i++) 
        $dev += (($data[$i] - $mean) * ($data[$i] - $mean));
    return sqrt($dev / $len);
}

function zscore($data, $len, $lag= 20, $threshold = 1, $influence = 1) {

    $signals = array();
    $avgFilter = array();
    $stdFilter = array();
    $filteredY = array();
    $avgFilter[$lag - 1] = mean($data, 0, $lag);
    $stdFilter[$lag - 1] = stddev($data, 0, $lag);
    
    for ($i = 0; $i < $len; $i++) {
        $filteredY[$i] = $data[$i];
        $signals[$i] = 0;
    }


    for ($i=$lag; $i < $len; $i++) {
        if (abs($data[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$lag - 1]) {
            if ($data[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            }
            else {
                $signals[$i] = -1;
            }
            $filteredY[$i] = $influence * $data[$i] + (1 - $influence) * $filteredY[$i-1];
        } 
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $data[$i];
        }
        
        $avgFilter[$i] = mean($filteredY, $i - $lag, $lag);
        $stdFilter[$i] = stddev($filteredY, $i - $lag, $lag);
    }
    return $signals;
}

$sig = zscore($y, count($y));

print_r($y); echo "<br><br>";
print_r($sig); echo "<br><br>";

for ($i = 0; $i < count($y); $i++) echo $i. " " . $y[$i]. " ". $sig[$i]."<br>";

不需要将极大值与平均值进行比较,还可以将极大值与相邻的最小值进行比较,其中最小值仅定义在噪声阈值之上。 如果局部最大值是>的3倍(或其他置信因子)相邻的最小值,那么这个最大值就是一个峰值。 移动窗口越宽,峰值的确定越准确。 上面使用了以窗口中间为中心的计算, 顺便说一下,而不是在窗口结束时计算(== lag)。

请注意,最大值必须被视为信号之前的增加 之后下降。

如果你的数据在一个数据库表中,这里是一个简单的z-score算法的SQL版本:

with data_with_zscore as (
    select
        date_time,
        value,
        value / (avg(value) over ()) as pct_of_mean,
        (value - avg(value) over ()) / (stdev(value) over ()) as z_score
    from {{tablename}}  where datetime > '2018-11-26' and datetime < '2018-12-03'
)


-- select all
select * from data_with_zscore 

-- select only points greater than a certain threshold
select * from data_with_zscore where z_score > abs(2)

这是一个Python实现的鲁棒峰值检测算法算法。

初始化和计算部分被分开,只有filtered_y数组被保留,它的最大大小等于延迟,因此内存没有增加。(结果与上述答案相同)。 为了绘制图形,还保留了标签数组。

我做了一个github要点。

import numpy as np
import pylab

def init(x, lag, threshold, influence):
    '''
    Smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''

    labels = np.zeros(lag)
    filtered_y = np.array(x[0:lag])
    avg_filter = np.zeros(lag)
    std_filter = np.zeros(lag)
    var_filter = np.zeros(lag)

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])

    return dict(avg=avg_filter[lag - 1], var=var_filter[lag - 1],
                std=std_filter[lag - 1], filtered_y=filtered_y,
                labels=labels)


def add(result, single_value, lag, threshold, influence):
    previous_avg = result['avg']
    previous_var = result['var']
    previous_std = result['std']
    filtered_y = result['filtered_y']
    labels = result['labels']

    if abs(single_value - previous_avg) > threshold * previous_std:
        if single_value > previous_avg:
            labels = np.append(labels, 1)
        else:
            labels = np.append(labels, -1)

        # calculate the new filtered element using the influence factor
        filtered_y = np.append(filtered_y, influence * single_value
                               + (1 - influence) * filtered_y[-1])
    else:
        labels = np.append(labels, 0)
        filtered_y = np.append(filtered_y, single_value)

    # update avg as sum of the previuos avg + the lag * (the new calculated item - calculated item at position (i - lag))
    current_avg_filter = previous_avg + 1. / lag * (filtered_y[-1]
            - filtered_y[len(filtered_y) - lag - 1])

    # update variance as the previuos element variance + 1 / lag * new recalculated item - the previous avg -
    current_var_filter = previous_var + 1. / lag * ((filtered_y[-1]
            - previous_avg) ** 2 - (filtered_y[len(filtered_y) - 1
            - lag] - previous_avg) ** 2 - (filtered_y[-1]
            - filtered_y[len(filtered_y) - 1 - lag]) ** 2 / lag)  # the recalculated element at pos (lag) - avg of the previuos - new recalculated element - recalculated element at lag pos ....

    # calculate standard deviation for current element as sqrt (current variance)
    current_std_filter = np.sqrt(current_var_filter)

    return dict(avg=current_avg_filter, var=current_var_filter,
                std=current_std_filter, filtered_y=filtered_y[1:],
                labels=labels)

lag = 30
threshold = 5
influence = 0

y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1])

# Run algo with settings from above
result = init(y[:lag], lag=lag, threshold=threshold, influence=influence)

i = open('quartz2', 'r')
for i in y[lag:]:
    result = add(result, i, lag, threshold, influence)

# Plot result
pylab.subplot(211)
pylab.plot(np.arange(1, len(y) + 1), y)
pylab.subplot(212)
pylab.step(np.arange(1, len(y) + 1), result['labels'], color='red',
           lw=2)
pylab.ylim(-1.5, 1.5)
pylab.show()