更新:到目前为止表现最好的算法是这个。
这个问题探讨了在实时时间序列数据中检测突然峰值的稳健算法。
考虑以下示例数据:
这个数据的例子是Matlab格式的(但这个问题不是关于语言,而是关于算法):
p = [1 1 1.1 1 0.9 1 1 1.1 1 0.9 1 1.1 1 1 0.9 1 1 1.1 1 1 1 1 1.1 0.9 1 1.1 1 1 0.9, ...
1 1.1 1 1 1.1 1 0.8 0.9 1 1.2 0.9 1 1 1.1 1.2 1 1.5 1 3 2 5 3 2 1 1 1 0.9 1 1, ...
3 2.6 4 3 3.2 2 1 1 0.8 4 4 2 2.5 1 1 1];
你可以清楚地看到有三个大峰和一些小峰。这个数据集是问题所涉及的时间序列数据集类的一个特定示例。这类数据集有两个一般特征:
有一种具有一般平均值的基本噪声
有很大的“峰值”或“更高的数据点”明显偏离噪声。
让我们假设以下情况:
峰的宽度不能事先确定
峰的高度明显偏离其他值
算法实时更新(因此每个新数据点都会更新)
对于这种情况,需要构造一个触发信号的边值。但是,边界值不能是静态的,必须通过算法实时确定。
我的问题是:什么是实时计算这些阈值的好算法?有没有针对这种情况的特定算法?最著名的算法是什么?
健壮的算法或有用的见解都受到高度赞赏。(可以用任何语言回答:这是关于算法的)
我在我的机器人项目中需要这样的东西。我想我可以归还Kotlin实现。
/**
* Smoothed zero-score alogrithm shamelessly copied from https://stackoverflow.com/a/22640362/6029703
* Uses a rolling mean and a rolling deviation (separate) to identify peaks in a vector
*
* @param y - The input vector to analyze
* @param lag - The lag of the moving window (i.e. how big the window is)
* @param threshold - The z-score at which the algorithm signals (i.e. how many standard deviations away from the moving mean a peak (or signal) is)
* @param influence - The influence (between 0 and 1) of new signals on the mean and standard deviation (how much a peak (or signal) should affect other values near it)
* @return - The calculated averages (avgFilter) and deviations (stdFilter), and the signals (signals)
*/
fun smoothedZScore(y: List<Double>, lag: Int, threshold: Double, influence: Double): Triple<List<Int>, List<Double>, List<Double>> {
val stats = SummaryStatistics()
// the results (peaks, 1 or -1) of our algorithm
val signals = MutableList<Int>(y.size, { 0 })
// filter out the signals (peaks) from our original list (using influence arg)
val filteredY = ArrayList<Double>(y)
// the current average of the rolling window
val avgFilter = MutableList<Double>(y.size, { 0.0 })
// the current standard deviation of the rolling window
val stdFilter = MutableList<Double>(y.size, { 0.0 })
// init avgFilter and stdFilter
y.take(lag).forEach { s -> stats.addValue(s) }
avgFilter[lag - 1] = stats.mean
stdFilter[lag - 1] = Math.sqrt(stats.populationVariance) // getStandardDeviation() uses sample variance (not what we want)
stats.clear()
//loop input starting at end of rolling window
(lag..y.size - 1).forEach { i ->
//if the distance between the current value and average is enough standard deviations (threshold) away
if (Math.abs(y[i] - avgFilter[i - 1]) > threshold * stdFilter[i - 1]) {
//this is a signal (i.e. peak), determine if it is a positive or negative signal
signals[i] = if (y[i] > avgFilter[i - 1]) 1 else -1
//filter this signal out using influence
filteredY[i] = (influence * y[i]) + ((1 - influence) * filteredY[i - 1])
} else {
//ensure this signal remains a zero
signals[i] = 0
//ensure this value is not filtered
filteredY[i] = y[i]
}
//update rolling average and deviation
(i - lag..i - 1).forEach { stats.addValue(filteredY[it]) }
avgFilter[i] = stats.getMean()
stdFilter[i] = Math.sqrt(stats.getPopulationVariance()) //getStandardDeviation() uses sample variance (not what we want)
stats.clear()
}
return Triple(signals, avgFilter, stdFilter)
}
带有验证图的示例项目可以在github上找到。
用现代c++实现的面向对象版z-score算法
template<typename T>
class FindPeaks{
private:
std::vector<T> m_input_signal; // stores input vector
std::vector<T> m_array_peak_positive;
std::vector<T> m_array_peak_negative;
public:
FindPeaks(const std::vector<T>& t_input_signal): m_input_signal{t_input_signal}{ }
void estimate(){
int lag{5};
T threshold{ 5 }; // set a threshold
T influence{ 0.5 }; // value between 0 to 1, 1 is normal influence and 0.5 is half the influence
std::vector<T> filtered_signal(m_input_signal.size(), 0.0); // placeholdered for smooth signal, initialie with all zeros
std::vector<int> signal(m_input_signal.size(), 0); // vector that stores where the negative and positive located
std::vector<T> avg_filtered(m_input_signal.size(), 0.0); // moving averages
std::vector<T> std_filtered(m_input_signal.size(), 0.0); // moving standard deviation
avg_filtered[lag] = findMean(m_input_signal.begin(), m_input_signal.begin() + lag); // pass the iteartor to vector
std_filtered[lag] = findStandardDeviation(m_input_signal.begin(), m_input_signal.begin() + lag);
for (size_t iLag = lag + 1; iLag < m_input_signal.size(); ++iLag) { // start index frm
if (std::abs(m_input_signal[iLag] - avg_filtered[iLag - 1]) > threshold * std_filtered[iLag - 1]) { // check if value is above threhold
if ((m_input_signal[iLag]) > avg_filtered[iLag - 1]) {
signal[iLag] = 1; // assign positive signal
}
else {
signal[iLag] = -1; // assign negative signal
}
filtered_signal[iLag] = influence * m_input_signal[iLag] + (1 - influence) * filtered_signal[iLag - 1]; // exponential smoothing
}
else {
signal[iLag] = 0; // no signal
filtered_signal[iLag] = m_input_signal[iLag];
}
avg_filtered[iLag] = findMean(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
std_filtered[iLag] = findStandardDeviation(filtered_signal.begin() + (iLag - lag), filtered_signal.begin() + iLag);
}
for (size_t iSignal = 0; iSignal < m_input_signal.size(); ++iSignal) {
if (signal[iSignal] == 1) {
m_array_peak_positive.emplace_back(m_input_signal[iSignal]); // store the positive peaks
}
else if (signal[iSignal] == -1) {
m_array_peak_negative.emplace_back(m_input_signal[iSignal]); // store the negative peaks
}
}
printVoltagePeaks(signal, m_input_signal);
}
std::pair< std::vector<T>, std::vector<T> > get_peaks()
{
return std::make_pair(m_array_peak_negative, m_array_peak_negative);
}
};
template<typename T1, typename T2 >
void printVoltagePeaks(std::vector<T1>& m_signal, std::vector<T2>& m_input_signal) {
std::ofstream output_file("./voltage_peak.csv");
std::ostream_iterator<T2> output_iterator_voltage(output_file, ",");
std::ostream_iterator<T1> output_iterator_signal(output_file, ",");
std::copy(m_input_signal.begin(), m_input_signal.end(), output_iterator_voltage);
output_file << "\n";
std::copy(m_signal.begin(), m_signal.end(), output_iterator_signal);
}
template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findMean(iterator_type it, iterator_type end)
{
/* function that receives iterator to*/
typename std::iterator_traits<iterator_type>::value_type sum{ 0.0 };
int counter = 0;
while (it != end) {
sum += *(it++);
counter++;
}
return sum / counter;
}
template<typename iterator_type>
typename std::iterator_traits<iterator_type>::value_type findStandardDeviation(iterator_type it, iterator_type end)
{
auto mean = findMean(it, end);
typename std::iterator_traits<iterator_type>::value_type sum_squared_error{ 0.0 };
int counter{ 0 };
while (it != end) {
sum_squared_error += std::pow((*(it++) - mean), 2);
counter++;
}
auto standard_deviation = std::sqrt(sum_squared_error / (counter - 1));
return standard_deviation;
}
我认为delica的Python回答器有一个bug。我不能评论他的帖子,因为我没有代表来做这件事,编辑队列已经满了,所以我可能不是第一个注意到它的人。
avgFilter[lag - 1]和stdFilter[lag - 1]在init中设置,然后在lag == i时再次设置,而不是改变[lag]值。这个结果使得第一个信号总是1。
以下是带有轻微修正的代码:
import numpy as np
class real_time_peak_detection():
def __init__(self, array, lag, threshold, influence):
self.y = list(array)
self.length = len(self.y)
self.lag = lag
self.threshold = threshold
self.influence = influence
self.signals = [0] * len(self.y)
self.filteredY = np.array(self.y).tolist()
self.avgFilter = [0] * len(self.y)
self.stdFilter = [0] * len(self.y)
self.avgFilter[self.lag - 1] = np.mean(self.y[0:self.lag]).tolist()
self.stdFilter[self.lag - 1] = np.std(self.y[0:self.lag]).tolist()
def thresholding_algo(self, new_value):
self.y.append(new_value)
i = len(self.y) - 1
self.length = len(self.y)
if i < self.lag:
return 0
elif i == self.lag:
self.signals = [0] * len(self.y)
self.filteredY = np.array(self.y).tolist()
self.avgFilter = [0] * len(self.y)
self.stdFilter = [0] * len(self.y)
self.avgFilter[self.lag] = np.mean(self.y[0:self.lag]).tolist()
self.stdFilter[self.lag] = np.std(self.y[0:self.lag]).tolist()
return 0
self.signals += [0]
self.filteredY += [0]
self.avgFilter += [0]
self.stdFilter += [0]
if abs(self.y[i] - self.avgFilter[i - 1]) > self.threshold * self.stdFilter[i - 1]:
if self.y[i] > self.avgFilter[i - 1]:
self.signals[i] = 1
else:
self.signals[i] = -1
self.filteredY[i] = self.influence * self.y[i] + (1 - self.influence) * self.filteredY[i - 1]
self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
else:
self.signals[i] = 0
self.filteredY[i] = self.y[i]
self.avgFilter[i] = np.mean(self.filteredY[(i - self.lag):i])
self.stdFilter[i] = np.std(self.filteredY[(i - self.lag):i])
return self.signals[i]