更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

下面是ZSCORE算法的PHP实现:

<?php
$y = array(1,7,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,10,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1);

function mean($data, $start, $len) {
    $avg = 0;
    for ($i = $start; $i < $start+ $len; $i ++)
        $avg += $data[$i];
    return $avg / $len;
}
    
function stddev($data, $start,$len) {
    $mean = mean($data,$start,$len);
    $dev = 0;
    for ($i = $start; $i < $start+$len; $i++) 
        $dev += (($data[$i] - $mean) * ($data[$i] - $mean));
    return sqrt($dev / $len);
}

function zscore($data, $len, $lag= 20, $threshold = 1, $influence = 1) {

    $signals = array();
    $avgFilter = array();
    $stdFilter = array();
    $filteredY = array();
    $avgFilter[$lag - 1] = mean($data, 0, $lag);
    $stdFilter[$lag - 1] = stddev($data, 0, $lag);
    
    for ($i = 0; $i < $len; $i++) {
        $filteredY[$i] = $data[$i];
        $signals[$i] = 0;
    }


    for ($i=$lag; $i < $len; $i++) {
        if (abs($data[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$lag - 1]) {
            if ($data[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            }
            else {
                $signals[$i] = -1;
            }
            $filteredY[$i] = $influence * $data[$i] + (1 - $influence) * $filteredY[$i-1];
        } 
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $data[$i];
        }
        
        $avgFilter[$i] = mean($filteredY, $i - $lag, $lag);
        $stdFilter[$i] = stddev($filteredY, $i - $lag, $lag);
    }
    return $signals;
}

$sig = zscore($y, count($y));

print_r($y); echo "<br><br>";
print_r($sig); echo "<br><br>";

for ($i = 0; $i < count($y); $i++) echo $i. " " . $y[$i]. " ". $sig[$i]."<br>";

其他回答

用现代c++实现的面向对象版z-score算法

template<typename T>
class FindPeaks{
private:
    std::vector<T> m_input_signal;                      // stores input vector
    std::vector<T> m_array_peak_positive;               
    std::vector<T> m_array_peak_negative;               

public:
    FindPeaks(const std::vector<T>& t_input_signal): m_input_signal{t_input_signal}{ }

    void estimate(){
        int lag{5};
        T threshold{ 5 };                                                                                       // set a threshold
        T influence{ 0.5 };                                                                                    // value between 0 to 1, 1 is normal influence and 0.5 is half the influence

        std::vector<T> filtered_signal(m_input_signal.size(), 0.0);                                             // placeholdered for smooth signal, initialie with all zeros
        std::vector<int> signal(m_input_signal.size(), 0);                                                          // vector that stores where the negative and positive located
        std::vector<T> avg_filtered(m_input_signal.size(), 0.0);                                                // moving averages
        std::vector<T> std_filtered(m_input_signal.size(), 0.0);                                                // moving standard deviation

        avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag);                         // pass the iteartor to vector
        std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);

        for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) {                                         // start index frm 
            if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) {     // check if value is above threhold             
                if ((m_input_signal[iLag]) > avg_filtered[iLag - 1]) {
                    signal[iLag] = 1;                                                                               // assign positive signal
                }
                else {
                    signal[iLag] = -1;                                                                                  // assign negative signal
                }
                filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1];        // exponential smoothing
            }
            else {
                signal[iLag] = 0;                                                                                         // no signal
                filtered_signal[iLag] = m_input_signal[iLag];
            }

            avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
            std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);

        }

        for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
            if (signal[iSignal] == 1) {
                m_array_peak_positive.emplace_back(m_input_signal[iSignal]);                                        // store the positive peaks
            }
            else if (signal[iSignal] == -1) {
                m_array_peak_negative.emplace_back(m_input_signal[iSignal]);                                         // store the negative peaks
            }
        }
        printVoltagePeaks(signal, m_input_signal);

    }

    std::pair< std::vector<T>, std::vector<T> > get_peaks()
    {
        return std::make_pair(m_array_peak_negative, m_array_peak_negative);
    }

};


template<typename T1, typename T2 >
void printVoltagePeaks(std::vector<T1>& m_signal, std::vector<T2>& m_input_signal) {
    std::ofstream output_file("./voltage_peak.csv");
    std::ostream_iterator<T2> output_iterator_voltage(output_file, ",");
    std::ostream_iterator<T1> output_iterator_signal(output_file, ",");
    std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
    output_file << "\n";
    std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);
}

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findMean(iterator_type it, iterator_type end)
{
    /* function that receives iterator to*/
    typename std::iterator_traits<iterator_type>::value_type sum{ 0.0 };
    int counter = 0;
    while (it != end) {
        sum += *(it++);
        counter++;
    }
    return sum / counter;
}

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findStandardDeviation(iterator_type it, iterator_type end)
{
    auto mean = findMean(it, end);
    typename std::iterator_traits<iterator_type>::value_type sum_squared_error{ 0.0 };
    int counter{ 0 };
    while (it != end) {
        sum_squared_error += std::pow((*(it++) - mean), 2);
        counter++;
    }
    auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
    return standard_deviation;
}

如果你的数据在一个数据库表中,这里是一个简单的z-score算法的SQL版本:

with data_with_zscore as (
    select
        date_time,
        value,
        value / (avg(value) over ()) as pct_of_mean,
        (value - avg(value) over ()) / (stdev(value) over ()) as z_score
    from {{tablename}}  where datetime > '2018-11-26' and datetime < '2018-12-03'
)


-- select all
select * from data_with_zscore 

-- select only points greater than a certain threshold
select * from data_with_zscore where z_score > abs(2)

一种方法是根据以下观察来检测峰:

时间t是一个峰值(y (t) > y (t - 1)) & & ((t) > y (t + 1))

它通过等待上升趋势结束来避免误报。它并不完全是“实时”的,因为它会比峰值差一个dt。灵敏度可以通过要求比较的裕度来控制。在噪声检测和时延检测之间存在一种折衷。 您可以通过添加更多参数来丰富模型:

峰如果y (y (t) - (t-dt) > m) && (y (t) - y (t + dt) > m)

dt和m是控制灵敏度和延时的参数

这是你用上述算法得到的结果:

下面是在python中重现图的代码:

import numpy as np
import matplotlib.pyplot as plt
input = np.array([ 1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1. ,  1.1,  1. ,  0.8,  0.9,
    1. ,  1.2,  0.9,  1. ,  1. ,  1.1,  1.2,  1. ,  1.5,  1. ,  3. ,
    2. ,  5. ,  3. ,  2. ,  1. ,  1. ,  1. ,  0.9,  1. ,  1. ,  3. ,
    2.6,  4. ,  3. ,  3.2,  2. ,  1. ,  1. ,  1. ,  1. ,  1. ])
signal = (input > np.roll(input,1)) & (input > np.roll(input,-1))
plt.plot(input)
plt.plot(signal.nonzero()[0], input[signal], 'ro')
plt.show()

通过设置m = 0.5,你可以得到一个更清晰的信号,只有一个假阳性:

在计算拓扑学中,持久同调的思想导致一个有效的 -快如排序数字-解决方案。它不仅检测峰值,还以一种自然的方式量化峰值的“重要性”,使您能够选择对您重要的峰值。

算法的总结。 在一维设置(时间序列,实值信号)中,算法可以简单地描述为下图:

Think of the function graph (or its sub-level set) as a landscape and consider a decreasing water level starting at level infinity (or 1.8 in this picture). While the level decreases, at local maxima islands pop up. At local minima these islands merge together. One detail in this idea is that the island that appeared later in time is merged into the island that is older. The "persistence" of an island is its birth time minus its death time. The lengths of the blue bars depict the persistence, which is the above mentioned "significance" of a peak.

效率。 在对函数值进行排序之后,找到一个在线性时间内运行的实现并不难——实际上它是一个单一的、简单的循环。因此,这种实现在实践中应该是快速的,也很容易实现。

参考文献 一篇关于整个故事的文章和对持久同调(计算代数拓扑中的一个领域)动机的引用可以在这里找到: https://www.sthu.org/blog/13-perstopology-peakdetection/index.html

下面是这个答案的平滑z-score算法的c++实现

std::vector<int> smoothedZScore(std::vector<float> input)
{   
    //lag 5 for the smoothing functions
    int lag = 5;
    //3.5 standard deviations for signal
    float threshold = 3.5;
    //between 0 and 1, where 1 is normal influence, 0.5 is half
    float influence = .5;

    if (input.size() <= lag + 2)
    {
        std::vector<int> emptyVec;
        return emptyVec;
    }

    //Initialise variables
    std::vector<int> signals(input.size(), 0.0);
    std::vector<float> filteredY(input.size(), 0.0);
    std::vector<float> avgFilter(input.size(), 0.0);
    std::vector<float> stdFilter(input.size(), 0.0);
    std::vector<float> subVecStart(input.begin(), input.begin() + lag);
    avgFilter[lag] = mean(subVecStart);
    stdFilter[lag] = stdDev(subVecStart);

    for (size_t i = lag + 1; i < input.size(); i++)
    {
        if (std::abs(input[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1])
        {
            if (input[i] > avgFilter[i - 1])
            {
                signals[i] = 1; //# Positive signal
            }
            else
            {
                signals[i] = -1; //# Negative signal
            }
            //Make influence lower
            filteredY[i] = influence* input[i] + (1 - influence) * filteredY[i - 1];
        }
        else
        {
            signals[i] = 0; //# No signal
            filteredY[i] = input[i];
        }
        //Adjust the filters
        std::vector<float> subVec(filteredY.begin() + i - lag, filteredY.begin() + i);
        avgFilter[i] = mean(subVec);
        stdFilter[i] = stdDev(subVec);
    }
    return signals;
}