我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。

目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。

我该如何处理这个问题呢?


当前回答

使用排序网络,以4为一组进行比较,这样就可以在SIMD寄存器中进行比较。一对打包的最小/最大指令实现了打包比较器函数。对不起,我现在没有时间去找我记得看到过的关于这个的页面,但希望在SIMD或SSE排序网络上搜索会找到一些东西。

x86 SSE确实为4个32位整型的向量提供了打包的32位整型的min和max指令。AVX2 (Haswell及后续版本)具有相同的功能,但用于256b的8个整型向量。还有高效的洗牌指令。

如果你有很多独立的小排序,用向量并行地做4到8个排序是可能的。特别是,如果你随机选择元素(所以要排序的数据在内存中不会连续),你可以避免打乱,只需要按照你需要的顺序进行比较。10个寄存器保存来自4个(AVX2: 8) 10个整数列表的所有数据,仍然留下6个reg作为临时空间。

如果还需要对相关数据进行排序,则向量排序网络的效率较低。在这种情况下,最有效的方法似乎是使用wrapped -compare来获得更改元素的掩码,并使用该掩码来混合相关数据的(引用)向量。

其他回答

这个问题并没有说这是某种基于web的应用程序。有一件事引起了我的注意:

我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。

As a software and hardware engineer this absolutely screams FPGA to me. I don't know what kind of conclusions you need to draw from the sorted set of numbers or where the data comes from, but I know it would be almost trivial to process somewhere between one hundred million and a billion of these "sort-and-analyze" operations per second. I've done FPGA-assisted DNA sequencing work in the past. It is nearly impossible to beat the massive processing power of FPGAs when the problem is well suited for that type of a solution.

在某种程度上,唯一的限制因素是将数据铲入FPGA的速度有多快,以及取出数据的速度有多快。

As a point of reference, I designed a high performance real-time image processor that received 32 bit RGB image data at a rate of about 300 million pixels per second. The data streamed through FIR filters, matrix multipliers, lookup tables, spatial edge detection blocks and a number of other operations before coming out the other end. All of this on a relatively small Xilinx Virtex2 FPGA with internal clocking spanning from about 33 MHz to, if I remember correctly, 400 MHz. Oh, yes, it also had a DDR2 controller implementation and ran two banks of DDR2 memory.

当工作在数百MHz时,FPGA可以在每次时钟转换中输出10个32位数字。当数据填满处理管道时,操作开始时会有短暂的延迟。在此之后,您应该能够在每个时钟获得一个结果。如果可以通过复制排序和分析管道使处理并行化,则会更多。原则上,解决方案几乎是微不足道的。

关键在于:如果应用程序不是pc绑定的,并且数据流和处理与FPGA解决方案“兼容”(无论是独立的还是作为机器中的协处理器卡),那么无论使用哪种算法,用任何语言编写的软件都无法击败可达到的性能水平。

我只是快速搜索了一下,找到了一篇可能对你有用的论文。看起来要追溯到2012年。在今天(甚至在过去),您可以在性能上做得更好。下面就是:

fpga上的排序网络

尽管网络排序在小数组上有很好的快速几率,但如果适当优化,有时您无法击败插入排序。例如,有2个元素的批量插入:

{
    final int a=in[0]<in[1]?in[0]:in[1];
    final int b=in[0]<in[1]?in[1]:in[0];
    in[0]=a;
    in[1]=b;
}
for(int x=2;x<10;x+=2)
{
    final int a=in[x]<in[x+1]?in[x]:in[x+1];
    final int b=in[x]<in[x+1]?in[x+1]:in[x];
    int y= x-1;

    while(y>=0&&in[y]>b)
    {
        in[y+2]= in[y];
        --y;
    }
    in[y+2]=b;
    while(y>=0&&in[y]>a)
    {
        in[y+1]= in[y];
        --y;
    }
    in[y+1]=a;
}

那么展开的、无分支的选择排序呢?

#include <iostream>
#include <algorithm>
#include <random>

//return the index of the minimum element in array a
int min(const int * const a) {
  int m = a[0];
  int indx = 0;
  #define TEST(i) (m > a[i]) && (m = a[i], indx = i ); 
  //see http://stackoverflow.com/a/7074042/2140449
  TEST(1);
  TEST(2);
  TEST(3);
  TEST(4);
  TEST(5);
  TEST(6);
  TEST(7);
  TEST(8);
  TEST(9);
  #undef TEST
  return indx;
}

void sort( int * const a ){
  int work[10];
  int indx;
  #define GET(i) indx = min(a); work[i] = a[indx]; a[indx] = 2147483647; 
  //get the minimum, copy it to work and set it at max_int in a
  GET(0);
  GET(1);
  GET(2);
  GET(3);
  GET(4);
  GET(5);
  GET(6);
  GET(7);
  GET(8);
  GET(9);
  #undef GET
  #define COPY(i) a[i] = work[i];
  //copy back to a
  COPY(0);
  COPY(1);
  COPY(2);
  COPY(3);
  COPY(4);
  COPY(5);
  COPY(6);
  COPY(7);
  COPY(8);
  COPY(9);
  #undef COPY
}

int main() {
  //generating and printing a random array
  int a[10] = { 1,2,3,4,5,6,7,8,9,10 };
  std::random_device rd;
  std::mt19937 g(rd());
  std::shuffle( a, a+10, g);
  for (int i = 0; i < 10; i++) {
    std::cout << a[i] << ' ';
  }
  std::cout << std::endl;

  //sorting and printing again
  sort(a);
  for (int i = 0; i < 10; i++) {
    std::cout << a[i] << ' ';
  } 

  return 0;
}

http://coliru.stacked-crooked.com/a/71e18bc4f7fa18c6

唯一相关的行是前两行#define。

它使用两个列表,并将第一个列表重新检查十次,这将是一个糟糕的选择排序,但它避免了分支和变长循环,这可能会补偿现代处理器和如此小的数据集。


基准

我对排序网络进行了基准测试,我的代码似乎变慢了。然而,我试图删除展开和复制。运行以下代码:

#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>

int min(const int * const a, int i) {
  int m = a[i];
  int indx = i++;
  for ( ; i<10; i++) 
    //see http://stackoverflow.com/a/7074042/2140449
    (m > a[i]) && (m = a[i], indx = i ); 
  return indx;
}

void sort( int * const a ){
  for (int i = 0; i<9; i++)
    std::swap(a[i], a[min(a,i)]); //search only forward
}


void sortNet10(int * const data) {  // ten-input sorting network by Waksman, 1969
    int swap;
    if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
    if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
    if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
    if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
    if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
    if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
    if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
    if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
    if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
    if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
    if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
    if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
    if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
    if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
    if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
    if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
    if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
    if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
    if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
    if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
    if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
    if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
    if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
    if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
    if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
    if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
    if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
    if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
    if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
}


std::chrono::duration<double> benchmark( void(*func)(int * const), const int seed ) {
  std::mt19937 g(seed);
  int a[10] = {10,11,12,13,14,15,16,17,18,19};
  std::chrono::high_resolution_clock::time_point t1, t2; 
  t1 = std::chrono::high_resolution_clock::now();
  for (long i = 0; i < 1e7; i++) {
    std::shuffle( a, a+10, g);
    func(a);
  }
  t2 = std::chrono::high_resolution_clock::now();
  return std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
}

int main() {
  std::random_device rd;
  for (int i = 0; i < 10; i++) {
    const int seed = rd();
    std::cout << "seed = " << seed << std::endl;
    std::cout << "sortNet10: " << benchmark(sortNet10, seed).count() << std::endl;
    std::cout << "sort:      " << benchmark(sort,      seed).count() << std::endl;
  }
  return 0;
}

与排序网络相比,我一直在为无分支选择排序获得更好的结果。

$ gcc -v
gcc version 5.2.0 (GCC) 
$ g++ -std=c++11 -Ofast sort.cpp && ./a.out
seed = -1727396418
sortNet10: 2.24137
sort:      2.21828
seed = 2003959850
sortNet10: 2.23914
sort:      2.21641
seed = 1994540383
sortNet10: 2.23782
sort:      2.21778
seed = 1258259982
sortNet10: 2.25199
sort:      2.21801
seed = 1821086932
sortNet10: 2.25535
sort:      2.2173
seed = 412262735
sortNet10: 2.24489
sort:      2.21776
seed = 1059795817
sortNet10: 2.29226
sort:      2.21777
seed = -188551272
sortNet10: 2.23803
sort:      2.22996
seed = 1043757247
sortNet10: 2.2503
sort:      2.23604
seed = -268332483
sortNet10: 2.24455
sort:      2.24304

出于类似于我在这里描述的原因,以下排序函数sort6_iterator()和sort10_iterator_local()应该能很好地执行,其中排序网络是从这里取的:

template<class IterType> 
inline void sort10_iterator(IterType it) 
{
#define SORT2(x,y) {if(data##x>data##y)std::swap(data##x,data##y);}
#define DD1(a)   auto data##a=*(data+a);
#define DD2(a,b) auto data##a=*(data+a), data##b=*(data+b);
#define CB1(a)   *(data+a)=data##a;
#define CB2(a,b) *(data+a)=data##a;*(data+b)=data##b;
  DD2(1,4) SORT2(1,4) DD2(7,8) SORT2(7,8) DD2(2,3) SORT2(2,3) DD2(5,6) SORT2(5,6) DD2(0,9) SORT2(0,9) 
  SORT2(2,5) SORT2(0,7) SORT2(8,9) SORT2(3,6) 
  SORT2(4,9) SORT2(0,1) 
  SORT2(0,2) CB1(0) SORT2(6,9) CB1(9) SORT2(3,5) SORT2(4,7) SORT2(1,8) 
  SORT2(3,4) SORT2(5,8) SORT2(6,7) SORT2(1,2) 
  SORT2(7,8) CB1(8) SORT2(1,3) CB1(1) SORT2(2,5) SORT2(4,6) 
  SORT2(2,3) CB1(2) SORT2(6,7) CB1(7) SORT2(4,5) 
  SORT2(3,4) CB2(3,4) SORT2(5,6) CB2(5,6) 
#undef CB1
#undef CB2
#undef DD1
#undef DD2
#undef SORT2
}

为了调用这个函数,我给它传递了一个std::vector迭代器。

使用排序网络,以4为一组进行比较,这样就可以在SIMD寄存器中进行比较。一对打包的最小/最大指令实现了打包比较器函数。对不起,我现在没有时间去找我记得看到过的关于这个的页面,但希望在SIMD或SSE排序网络上搜索会找到一些东西。

x86 SSE确实为4个32位整型的向量提供了打包的32位整型的min和max指令。AVX2 (Haswell及后续版本)具有相同的功能,但用于256b的8个整型向量。还有高效的洗牌指令。

如果你有很多独立的小排序,用向量并行地做4到8个排序是可能的。特别是,如果你随机选择元素(所以要排序的数据在内存中不会连续),你可以避免打乱,只需要按照你需要的顺序进行比较。10个寄存器保存来自4个(AVX2: 8) 10个整数列表的所有数据,仍然留下6个reg作为临时空间。

如果还需要对相关数据进行排序,则向量排序网络的效率较低。在这种情况下,最有效的方法似乎是使用wrapped -compare来获得更改元素的掩码,并使用该掩码来混合相关数据的(引用)向量。