更新:到目前为止表现最好的算法是这个。


这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。

考虑以下示例数据:

这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):

p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
     1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ... 
     3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];

你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:

有一种具有一般平均值的基本噪声 有很大的“峰值”或“更高的数据点”明显偏离噪声。

让我们假设以下情况:

峰的宽度不能事先确定 峰的高度明显偏离其他值 算法实时更新(因此每个新数据点都会更新)

对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。


我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?


健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)


当前回答

@Jean-Paul算法的Perl实现。

#!/usr/bin/perl

use strict;
use Data::Dumper;

sub mean {
    my $data = shift;
    my $sum = 0;
    my $mean_val = 0;
    for my $item (@$data) {
        $sum += $item;
    }
    $mean_val = $sum / (scalar @$data) if @$data;
    return $mean_val;
}

sub variance {
    my $data = shift;
    my $variance_val = 0;
    my $mean_val = mean($data);
    my $sum = 0;
    for my $item (@$data) {
        $sum += ($item - $mean_val)**2;
    }
    $variance_val = $sum / (scalar @$data) if @$data;
    return $variance_val;
}

sub std {
    my $data = shift;
    my $variance_val = variance($data);
    return sqrt($variance_val);
}

# @param y - The input vector to analyze
# @parameter lag - The lag of the moving window
# @parameter threshold - The z-score at which the algorithm signals
# @parameter influence - The influence (between 0 and 1) of new signals on the mean and standard deviation
sub thresholding_algo {
    my ($y, $lag, $threshold, $influence) = @_;

    my @signals = (0) x @$y;
    my @filteredY = @$y;
    my @avgFilter = (0) x @$y;
    my @stdFilter = (0) x @$y;

    $avgFilter[$lag - 1] = mean([@$y[0..$lag-1]]);
    $stdFilter[$lag - 1] = std([@$y[0..$lag-1]]);

    for (my $i=$lag; $i <= @$y - 1; $i++) {
        if (abs($y->[$i] - $avgFilter[$i-1]) > $threshold * $stdFilter[$i-1]) {
            if ($y->[$i] > $avgFilter[$i-1]) {
                $signals[$i] = 1;
            } else {
                $signals[$i] = -1;
            }

            $filteredY[$i] = $influence * $y->[$i] + (1 - $influence) * $filteredY[$i-1];
            $avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
            $stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
        }
        else {
            $signals[$i] = 0;
            $filteredY[$i] = $y->[$i];
            $avgFilter[$i] = mean([@filteredY[($i-$lag)..($i-1)]]);
            $stdFilter[$i] = std([@filteredY[($i-$lag)..($i-1)]]);
        }
    }

    return {
        signals => \@signals,
        avgFilter => \@avgFilter,
        stdFilter => \@stdFilter
    };
}

my $y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1];

my $lag = 30;
my $threshold = 5;
my $influence = 0;

my $result = thresholding_algo($y, $lag, $threshold, $influence);

print Dumper $result;

其他回答

我想把我的Julia算法实现提供给其他人。要点可以在这里找到

using Statistics
using Plots
function SmoothedZscoreAlgo(y, lag, threshold, influence)
    # Julia implimentation of http://stackoverflow.com/a/22640362/6029703
    n = length(y)
    signals = zeros(n) # init signal results
    filteredY = copy(y) # init filtered series
    avgFilter = zeros(n) # init average filter
    stdFilter = zeros(n) # init std filter
    avgFilter[lag - 1] = mean(y[1:lag]) # init first value
    stdFilter[lag - 1] = std(y[1:lag]) # init first value

    for i in range(lag, stop=n-1)
        if abs(y[i] - avgFilter[i-1]) > threshold*stdFilter[i-1]
            if y[i] > avgFilter[i-1]
                signals[i] += 1 # postive signal
            else
                signals[i] += -1 # negative signal
            end
            # Make influence lower
            filteredY[i] = influence*y[i] + (1-influence)*filteredY[i-1]
        else
            signals[i] = 0
            filteredY[i] = y[i]
        end
        avgFilter[i] = mean(filteredY[i-lag+1:i])
        stdFilter[i] = std(filteredY[i-lag+1:i])
    end
    return (signals = signals, avgFilter = avgFilter, stdFilter = stdFilter)
end


# Data
y = [1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9,
       1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3,
       2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1]

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

results = SmoothedZscoreAlgo(y, lag, threshold, influence)
upper_bound = results[:avgFilter] + threshold * results[:stdFilter]
lower_bound = results[:avgFilter] - threshold * results[:stdFilter]
x = 1:length(y)

yplot = plot(x,y,color="blue", label="Y",legend=:topleft)
yplot = plot!(x,upper_bound, color="green", label="Upper Bound",legend=:topleft)
yplot = plot!(x,results[:avgFilter], color="cyan", label="Average Filter",legend=:topleft)
yplot = plot!(x,lower_bound, color="green", label="Lower Bound",legend=:topleft)
signalplot = plot(x,results[:signals],color="red",label="Signals",legend=:topleft)
plot(yplot,signalplot,layout=(2,1),legend=:topleft)

下面是我尝试为“Smoothed z-score算法”创建一个Ruby解决方案:

module ThresholdingAlgoMixin
  def mean(array)
    array.reduce(&:+) / array.size.to_f
  end

  def stddev(array)
    array_mean = mean(array)
    Math.sqrt(array.reduce(0.0) { |a, b| a.to_f + ((b.to_f - array_mean) ** 2) } / array.size.to_f)
  end

  def thresholding_algo(lag: 5, threshold: 3.5, influence: 0.5)
    return nil if size < lag * 2
    Array.new(size, 0).tap do |signals|
      filtered = Array.new(self)

      initial_slice = take(lag)
      avg_filter = Array.new(lag - 1, 0.0) + [mean(initial_slice)]
      std_filter = Array.new(lag - 1, 0.0) + [stddev(initial_slice)]
      (lag..size-1).each do |idx|
        prev = idx - 1
        if (fetch(idx) - avg_filter[prev]).abs > threshold * std_filter[prev]
          signals[idx] = fetch(idx) > avg_filter[prev] ? 1 : -1
          filtered[idx] = (influence * fetch(idx)) + ((1-influence) * filtered[prev])
        end

        filtered_slice = filtered[idx-lag..prev]
        avg_filter[idx] = mean(filtered_slice)
        std_filter[idx] = stddev(filtered_slice)
      end
    end
  end
end

以及示例用法:

test_data = [
  1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1,
  1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1.1, 1,
  1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5,
  1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3, 2.6, 4, 3, 3.2, 2,
  1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1
].extend(ThresholdingAlgoMixin)

puts test_data.thresholding_algo.inspect

# Output: [
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0,
#   0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
#   1, 1, 0, 0, 0, -1, -1, 0, 0, 0, 0, 0, 0, 0, 0
# ]

如果你的数据在一个数据库表中,这里是一个简单的z-score算法的SQL版本:

with data_with_zscore as (
    select
        date_time,
        value,
        value / (avg(value) over ()) as pct_of_mean,
        (value - avg(value) over ()) / (stdev(value) over ()) as z_score
    from {{tablename}}  where datetime > '2018-11-26' and datetime < '2018-12-03'
)


-- select all
select * from data_with_zscore 

-- select only points greater than a certain threshold
select * from data_with_zscore where z_score > abs(2)

用现代c++实现的面向对象版z-score算法

template<typename T>
class FindPeaks{
private:
    std::vector<T> m_input_signal;                      // stores input vector
    std::vector<T> m_array_peak_positive;               
    std::vector<T> m_array_peak_negative;               

public:
    FindPeaks(const std::vector<T>& t_input_signal): m_input_signal{t_input_signal}{ }

    void estimate(){
        int lag{5};
        T threshold{ 5 };                                                                                       // set a threshold
        T influence{ 0.5 };                                                                                    // value between 0 to 1, 1 is normal influence and 0.5 is half the influence

        std::vector<T> filtered_signal(m_input_signal.size(), 0.0);                                             // placeholdered for smooth signal, initialie with all zeros
        std::vector<int> signal(m_input_signal.size(), 0);                                                          // vector that stores where the negative and positive located
        std::vector<T> avg_filtered(m_input_signal.size(), 0.0);                                                // moving averages
        std::vector<T> std_filtered(m_input_signal.size(), 0.0);                                                // moving standard deviation

        avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag);                         // pass the iteartor to vector
        std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);

        for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) {                                         // start index frm 
            if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) {     // check if value is above threhold             
                if ((m_input_signal[iLag]) > avg_filtered[iLag - 1]) {
                    signal[iLag] = 1;                                                                               // assign positive signal
                }
                else {
                    signal[iLag] = -1;                                                                                  // assign negative signal
                }
                filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1];        // exponential smoothing
            }
            else {
                signal[iLag] = 0;                                                                                         // no signal
                filtered_signal[iLag] = m_input_signal[iLag];
            }

            avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
            std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);

        }

        for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
            if (signal[iSignal] == 1) {
                m_array_peak_positive.emplace_back(m_input_signal[iSignal]);                                        // store the positive peaks
            }
            else if (signal[iSignal] == -1) {
                m_array_peak_negative.emplace_back(m_input_signal[iSignal]);                                         // store the negative peaks
            }
        }
        printVoltagePeaks(signal, m_input_signal);

    }

    std::pair< std::vector<T>, std::vector<T> > get_peaks()
    {
        return std::make_pair(m_array_peak_negative, m_array_peak_negative);
    }

};


template<typename T1, typename T2 >
void printVoltagePeaks(std::vector<T1>& m_signal, std::vector<T2>& m_input_signal) {
    std::ofstream output_file("./voltage_peak.csv");
    std::ostream_iterator<T2> output_iterator_voltage(output_file, ",");
    std::ostream_iterator<T1> output_iterator_signal(output_file, ",");
    std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
    output_file << "\n";
    std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);
}

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findMean(iterator_type it, iterator_type end)
{
    /* function that receives iterator to*/
    typename std::iterator_traits<iterator_type>::value_type sum{ 0.0 };
    int counter = 0;
    while (it != end) {
        sum += *(it++);
        counter++;
    }
    return sum / counter;
}

template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findStandardDeviation(iterator_type it, iterator_type end)
{
    auto mean = findMean(it, end);
    typename std::iterator_traits<iterator_type>::value_type sum_squared_error{ 0.0 };
    int counter{ 0 };
    while (it != end) {
        sum_squared_error += std::pow((*(it++) - mean), 2);
        counter++;
    }
    auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
    return standard_deviation;
}

使用实时流的Python版本(不会在每个新数据点到达时重新计算所有数据点)。您可能想要调整类函数返回的内容—对于我的目的,我只需要信号。

import numpy as np


class real_time_peak_detection():
    def __init__(self, array, lag, threshold, influence):
        self.y = list(array)
        self.length = len(self.y)
        self.lag = lag
        self.threshold = threshold
        self.influence = influence
        self.signals = [0] * len(self.y)
        self.filteredY = np.array(self.y).tolist()
        self.avgFilter = [0] * len(self.y)
        self.stdFilter = [0] * len(self.y)
        self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
        self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()

    def thresholding_algo(self, new_value):
        self.y.append(new_value)
        i = len(self.y) - 1
        self.length = len(self.y)
        if i < self.lag:
            return 0
        elif i == self.lag:
            self.signals = [0] * len(self.y)
            self.filteredY = np.array(self.y).tolist()
            self.avgFilter = [0] * len(self.y)
            self.stdFilter = [0] * len(self.y)
            self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
            self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
            return 0

        self.signals += [0]
        self.filteredY += [0]
        self.avgFilter += [0]
        self.stdFilter += [0]

        if abs(self.y[i] - self.avgFilter[i - 1]) > (self.threshold * self.stdFilter[i - 1]):

            if self.y[i] > self.avgFilter[i - 1]:
                self.signals[i] = 1
            else:
                self.signals[i] = -1

            self.filteredY[i] = self.influence * self.y[i] + \
                (1 - self.influence) * self.filteredY[i - 1]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
        else:
            self.signals[i] = 0
            self.filteredY[i] = self.y[i]
            self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
            self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])

        return self.signals[i]