可能的重复: 滚动中值算法

假设整数是从数据流中读取的。以有效的方式查找到目前为止读取的元素的中位数。

我读过的解决方案:我们可以在左边使用max堆来表示小于有效中位数的元素,在右边使用min堆来表示大于有效中位数的元素。

在处理一个传入元素后,堆中的元素数量最多相差1个元素。当两个堆包含相同数量的元素时,我们发现堆根数据的平均值为有效中位数。当堆不平衡时,我们从包含更多元素的堆根中选择有效中值。

但是我们如何构造最大堆和最小堆也就是说,我们如何知道这里的有效中值?我认为我们应该在max-heap中插入1个元素然后在min-heap中插入下一个元素,如此类推。如果我说错了请指正。


当前回答

下面是我简单但有效的算法(c++),用于从整数流中计算运行中值:

#include<algorithm>
#include<fstream>
#include<vector>
#include<list>

using namespace std;

void runningMedian(std::ifstream& ifs, std::ofstream& ofs, const unsigned bufSize) {
    if (bufSize < 1)
        throw exception("Wrong buffer size.");
    bool evenSize = bufSize % 2 == 0 ? true : false;
    list<int> q;
    vector<int> nums;
    int n;
    unsigned count = 0;
    while (ifs.good()) {
        ifs >> n;
        q.push_back(n);
        auto ub = std::upper_bound(nums.begin(), nums.end(), n);
        nums.insert(ub, n);
        count++;
        if (nums.size() >= bufSize) {
            auto it = std::find(nums.begin(), nums.end(), q.front());
            nums.erase(it);
            q.pop_front();
            if (evenSize)
                ofs << count << ": " << (static_cast<double>(nums[nums.size() / 2 - 1] +
                static_cast<double>(nums[nums.size() / 2]))) / 2.0 << '\n';
            else
                ofs << count << ": " << static_cast<double>(nums[nums.size() / 2]);
        }
    }
}

The bufferSize specifies the size of the numbers sequence, on which the running median must be calculated. When reading numbers from the input stream ifs the vector of the size bufferSize is maintained in sorted order. The median is calculated by taking the middle of the sorted vector, if bufferSize is odd, or the sum of the two middle elements divided by 2, when bufferSize is even. Additinally, I maintain a list of last bufferSize elements read from input. When a new element is added, I put it in the right place in sorted vector and remove from the vector the element added bufferSize steps before (the value of the element retained in the front of the list). In the same time I remove the old element from the list: every new element is placed on the back of the list, every old element is removed from the front. After reaching the bufferSize, both the list and the vector stop to grow, and every insertion of a new element is compensated be deletion of an old element, placed in the list bufferSize steps before. Note, I do not care, whether I remove from the vector exactly the element, placed bufferSize steps before, or just an element that has the same value. For the value of median it does not matter. All calculated median values are output in the output stream.

其他回答

如果您不能一次将所有项保存在内存中,这个问题就会变得更加困难。堆解决方案要求您一次将所有元素保存在内存中。这在这个问题的大多数实际应用中是不可能的。

相反,当您看到数字时,请记录您看到每个整数的次数。假设4个字节整数,即2^32个桶,或最多2^33个整数(每个int的key和count),即2^35字节或32GB。它可能会比这个小得多,因为您不需要存储键或为那些为0的条目计数(例如。就像python中的defaultdict)。插入每个新整数需要常数时间。

然后在任意点,要找到中位数,只需使用计数来确定哪个整数是中间元素。这需要常数时间(虽然是一个很大的常数,但仍然是常数)。

如果我们想要找到n个最近出现的元素的中值,这个问题有一个精确的解决方案,只需要将n个最近出现的元素保存在内存中。它速度快,规模大。

可索引skiplist支持O(ln n)插入、删除和任意元素的索引搜索,同时保持排序顺序。当再加上一个FIFO队列来跟踪第n个最古老的条目时,解决方案很简单:

class RunningMedian:
    'Fast running median with O(lg n) updates where n is the window size'

    def __init__(self, n, iterable):
        self.it = iter(iterable)
        self.queue = deque(islice(self.it, n))
        self.skiplist = IndexableSkiplist(n)
        for elem in self.queue:
            self.skiplist.insert(elem)

    def __iter__(self):
        queue = self.queue
        skiplist = self.skiplist
        midpoint = len(queue) // 2
        yield skiplist[midpoint]
        for newelem in self.it:
            oldelem = queue.popleft()
            skiplist.remove(oldelem)
            queue.append(newelem)
            skiplist.insert(newelem)
            yield skiplist[midpoint]

以下是完整工作代码的链接(一个易于理解的类版本和一个内联可索引的skiplist代码的优化生成器版本):

http://code.activestate.com/recipes/576930-efficient-running-median-using-an-indexable-skipli/ http://code.activestate.com/recipes/577073。

我可以确认@schmil-the-cat的答案是正确的。

下面是一个JS的实现。我不是算法专家,但我认为它可能对其他人有用。


class Heap {
  constructor(isMin) {
    this.heap = [];
    this.isMin = isMin;
  }

  heapify() {
    if (this.heap.length === 1) {
      return;
    }

    let currentIndex = this.heap.length - 1; 

    while (true) {
      if (currentIndex === 0) {
        break;
      }

      const parentIndex = Math.floor((currentIndex - 1) / 2);
      const parentValue = this.heap[parentIndex];
      const currentValue = this.heap[currentIndex];

      if (
        (this.isMin && parentValue < currentValue) ||
        (!this.isMin && parentValue > currentValue)
      ) {
        break;
      }

      this.heap[parentIndex] = currentValue;
      this.heap[currentIndex] = parentValue;

      currentIndex = parentIndex;
    }
  }

  insert(val) {
    this.heap.push(val);

    this.heapify();
  }

  pop() {
    const val = this.heap.shift();
    this.heapify();
    return val;
  }

  top() {
    return this.heap[0];
  }

  length() {
    return this.heap.length;
  }
}

function findMedian(arr) {
  const topHeap = new Heap(true);
  const bottomHeap = new Heap(false);

  const output = [];

  if (arr.length === 1) {
    return arr[0];
  }

  topHeap.insert(Math.max(arr[0], arr[1]));
  bottomHeap.insert(Math.min(arr[0], arr[1]));

  for (let i = 0; i < arr.length; i++) {
    const currentVal = arr[i];

    if (i === 0) {
      output.push(currentVal);
      continue;
    }

    if (i > 1) {
      if (currentVal < bottomHeap.top()) {
        bottomHeap.insert(currentVal);
      } else {
        topHeap.insert(currentVal);
      }
    }

    if (bottomHeap.length() - topHeap.length() > 1) {
      const bottomVal = bottomHeap.pop();
      topHeap.insert(bottomVal);
    }

    if (topHeap.length() - bottomHeap.length() > 1) {
      const topVal = topHeap.pop();
      bottomHeap.insert(topVal);
    }

    if (bottomHeap.length() === topHeap.length()) {
      output.push(Math.floor((bottomHeap.top() + topHeap.top()) / 2));
      continue;
    }

    if (bottomHeap.length() > topHeap.length()) {
      output.push(bottomHeap.top());
    } else {
      output.push(topHeap.top());
    }
  }

  return output;
}

一种直观的思考方法是如果你有一棵完全平衡的二叉搜索树,那么根就是中值元素,因为这里有相同数量的较大和较小的元素。 现在,如果树没有满,情况就不一样了,因为上一层中会有元素缺失。

所以我们可以用中值和两棵平衡二叉树,一棵表示小于中值的元素,另一棵表示大于中值的元素。这两棵树必须保持相同的大小。

当我们从数据流中获得一个新整数时,我们将其与中位数进行比较。如果它大于中值,我们就把它加到右边的树上。如果两个树的大小相差超过1,我们删除右边树的最小元素,使其成为新的中值,并将旧的中值放在左边树中。更小的也一样。

从流数据中找到运行中值有许多不同的解决方案,我将在答案的最后简要地讨论它们。

这个问题是关于特定解决方案(最大堆/最小堆解决方案)的细节,以及基于堆的解决方案如何工作的解释如下:

对于前两个元素,将较小的元素添加到左边的maxHeap中,将较大的元素添加到右边的minHeap中。然后逐个处理流数据,

Step 1: Add next item to one of the heaps

   if next item is smaller than maxHeap root add it to maxHeap,
   else add it to minHeap

Step 2: Balance the heaps (after this step heaps will be either balanced or
   one of them will contain 1 more item)

   if number of elements in one of the heaps is greater than the other by
   more than 1, remove the root element from the one containing more elements and
   add to the other one

然后在任何给定的时间,你都可以像这样计算中值:

   If the heaps contain equal amount of elements;
     median = (root of maxHeap + root of minHeap)/2
   Else
     median = root of the heap with more elements

Now I will talk about the problem in general as promised in the beginning of the answer. Finding running median from a stream of data is a tough problem, and finding an exact solution with memory constraints efficiently is probably impossible for the general case. On the other hand, if the data has some characteristics we can exploit, we can develop efficient specialized solutions. For example, if we know that the data is an integral type, then we can use counting sort, which can give you a constant memory constant time algorithm. Heap based solution is a more general solution because it can be used for other data types (doubles) as well. And finally, if the exact median is not required and an approximation is enough, you can just try to estimate a probability density function for the data and estimate median using that.