我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。
目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。
我该如何处理这个问题呢?
我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。
目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。
我该如何处理这个问题呢?
当前回答
您可以完全展开插入排序。
为了简化这一点,可以在没有函数开销的情况下使用递归模板。因为int已经是模板,所以它也可以是模板形参。这也使得编码10以外的数组大小变得很简单。
注意,要排序int x[10],调用insert_sort<int, 9>::sort(x);因为类使用最后一项的索引。这可以被包装,但是需要阅读更多的代码。
template <class T, int NUM>
class insert_sort;
template <class T>
class insert_sort<T,0>
// Stop template recursion
// Sorting one item is a no operation
{
public:
static void place(T *x) {}
static void sort(T * x) {}
};
template <class T, int NUM>
class insert_sort
// Use template recursion to do insertion sort.
// NUM is the index of the last item, e.g. for x[10] call <9>
{
public:
static void place(T *x)
{
T t1=x[NUM-1];
T t2=x[NUM];
if (t1 > t2)
{
x[NUM-1]=t2;
x[NUM]=t1;
insert_sort<T,NUM-1>::place(x);
}
}
static void sort(T * x)
{
insert_sort<T,NUM-1>::sort(x); // Sort everything before
place(x); // Put this item in
}
};
在我的测试中,这比排序网络的例子要快。
其他回答
那么展开的、无分支的选择排序呢?
#include <iostream>
#include <algorithm>
#include <random>
//return the index of the minimum element in array a
int min(const int * const a) {
int m = a[0];
int indx = 0;
#define TEST(i) (m > a[i]) && (m = a[i], indx = i );
//see http://stackoverflow.com/a/7074042/2140449
TEST(1);
TEST(2);
TEST(3);
TEST(4);
TEST(5);
TEST(6);
TEST(7);
TEST(8);
TEST(9);
#undef TEST
return indx;
}
void sort( int * const a ){
int work[10];
int indx;
#define GET(i) indx = min(a); work[i] = a[indx]; a[indx] = 2147483647;
//get the minimum, copy it to work and set it at max_int in a
GET(0);
GET(1);
GET(2);
GET(3);
GET(4);
GET(5);
GET(6);
GET(7);
GET(8);
GET(9);
#undef GET
#define COPY(i) a[i] = work[i];
//copy back to a
COPY(0);
COPY(1);
COPY(2);
COPY(3);
COPY(4);
COPY(5);
COPY(6);
COPY(7);
COPY(8);
COPY(9);
#undef COPY
}
int main() {
//generating and printing a random array
int a[10] = { 1,2,3,4,5,6,7,8,9,10 };
std::random_device rd;
std::mt19937 g(rd());
std::shuffle( a, a+10, g);
for (int i = 0; i < 10; i++) {
std::cout << a[i] << ' ';
}
std::cout << std::endl;
//sorting and printing again
sort(a);
for (int i = 0; i < 10; i++) {
std::cout << a[i] << ' ';
}
return 0;
}
http://coliru.stacked-crooked.com/a/71e18bc4f7fa18c6
唯一相关的行是前两行#define。
它使用两个列表,并将第一个列表重新检查十次,这将是一个糟糕的选择排序,但它避免了分支和变长循环,这可能会补偿现代处理器和如此小的数据集。
基准
我对排序网络进行了基准测试,我的代码似乎变慢了。然而,我试图删除展开和复制。运行以下代码:
#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>
int min(const int * const a, int i) {
int m = a[i];
int indx = i++;
for ( ; i<10; i++)
//see http://stackoverflow.com/a/7074042/2140449
(m > a[i]) && (m = a[i], indx = i );
return indx;
}
void sort( int * const a ){
for (int i = 0; i<9; i++)
std::swap(a[i], a[min(a,i)]); //search only forward
}
void sortNet10(int * const data) { // ten-input sorting network by Waksman, 1969
int swap;
if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
}
std::chrono::duration<double> benchmark( void(*func)(int * const), const int seed ) {
std::mt19937 g(seed);
int a[10] = {10,11,12,13,14,15,16,17,18,19};
std::chrono::high_resolution_clock::time_point t1, t2;
t1 = std::chrono::high_resolution_clock::now();
for (long i = 0; i < 1e7; i++) {
std::shuffle( a, a+10, g);
func(a);
}
t2 = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
}
int main() {
std::random_device rd;
for (int i = 0; i < 10; i++) {
const int seed = rd();
std::cout << "seed = " << seed << std::endl;
std::cout << "sortNet10: " << benchmark(sortNet10, seed).count() << std::endl;
std::cout << "sort: " << benchmark(sort, seed).count() << std::endl;
}
return 0;
}
与排序网络相比,我一直在为无分支选择排序获得更好的结果。
$ gcc -v
gcc version 5.2.0 (GCC)
$ g++ -std=c++11 -Ofast sort.cpp && ./a.out
seed = -1727396418
sortNet10: 2.24137
sort: 2.21828
seed = 2003959850
sortNet10: 2.23914
sort: 2.21641
seed = 1994540383
sortNet10: 2.23782
sort: 2.21778
seed = 1258259982
sortNet10: 2.25199
sort: 2.21801
seed = 1821086932
sortNet10: 2.25535
sort: 2.2173
seed = 412262735
sortNet10: 2.24489
sort: 2.21776
seed = 1059795817
sortNet10: 2.29226
sort: 2.21777
seed = -188551272
sortNet10: 2.23803
sort: 2.22996
seed = 1043757247
sortNet10: 2.2503
sort: 2.23604
seed = -268332483
sortNet10: 2.24455
sort: 2.24304
使用排序网络,以4为一组进行比较,这样就可以在SIMD寄存器中进行比较。一对打包的最小/最大指令实现了打包比较器函数。对不起,我现在没有时间去找我记得看到过的关于这个的页面,但希望在SIMD或SSE排序网络上搜索会找到一些东西。
x86 SSE确实为4个32位整型的向量提供了打包的32位整型的min和max指令。AVX2 (Haswell及后续版本)具有相同的功能,但用于256b的8个整型向量。还有高效的洗牌指令。
如果你有很多独立的小排序,用向量并行地做4到8个排序是可能的。特别是,如果你随机选择元素(所以要排序的数据在内存中不会连续),你可以避免打乱,只需要按照你需要的顺序进行比较。10个寄存器保存来自4个(AVX2: 8) 10个整数列表的所有数据,仍然留下6个reg作为临时空间。
如果还需要对相关数据进行排序,则向量排序网络的效率较低。在这种情况下,最有效的方法似乎是使用wrapped -compare来获得更改元素的掩码,并使用该掩码来混合相关数据的(引用)向量。
(根据@HelloWorld的建议,研究排序网络。)
似乎29个比较/交换网络是进行10个输入排序的最快方法。在这个例子中,我使用了Waksman在1969年发现的JavaScript网络,它应该直接转换成C语言,因为它只是一个if语句、比较和交换的列表。
function sortNet10(data) { // ten-input sorting network by Waksman, 1969 var swap; if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; } if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; } if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; } if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; } if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; } if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; } if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; } if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; } if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; } if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; } if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; } if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; } if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; } if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; } if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; } if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; } if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; } if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; } if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; } if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; } if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; } if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; } if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; } if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; } if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; } if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; } if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; } if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; } if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; } return(data); } document.write(sortNet10([5,7,1,8,4,3,6,9,2,0]));
这里是网络的图形表示,分为独立的阶段。
为了利用并行处理的优势,可以将5-4-3-3 - 4-4-2 -3-2分组改为4-4-4-2 -4-4-3-2分组。
当您处理这个固定大小时,请查看排序网络。这些算法有固定的运行时间,并且独立于它们的输入。对于您的用例,您没有某些排序算法所具有的这种开销。
二进制排序就是这种网络的一种实现。这个方法在CPU上使用len(n) <= 32时效果最好。对于更大的输入,你可以考虑使用GPU。
顺便说一下,比较排序算法的一个好页面是这个(尽管它缺少二进制排序):
排序算法动画
您可以完全展开插入排序。
为了简化这一点,可以在没有函数开销的情况下使用递归模板。因为int已经是模板,所以它也可以是模板形参。这也使得编码10以外的数组大小变得很简单。
注意,要排序int x[10],调用insert_sort<int, 9>::sort(x);因为类使用最后一项的索引。这可以被包装,但是需要阅读更多的代码。
template <class T, int NUM>
class insert_sort;
template <class T>
class insert_sort<T,0>
// Stop template recursion
// Sorting one item is a no operation
{
public:
static void place(T *x) {}
static void sort(T * x) {}
};
template <class T, int NUM>
class insert_sort
// Use template recursion to do insertion sort.
// NUM is the index of the last item, e.g. for x[10] call <9>
{
public:
static void place(T *x)
{
T t1=x[NUM-1];
T t2=x[NUM];
if (t1 > t2)
{
x[NUM-1]=t2;
x[NUM]=t1;
insert_sort<T,NUM-1>::place(x);
}
}
static void sort(T * x)
{
insert_sort<T,NUM-1>::sort(x); // Sort everything before
place(x); // Put this item in
}
};
在我的测试中,这比排序网络的例子要快。