我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。
目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。
我该如何处理这个问题呢?
我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。
目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。
我该如何处理这个问题呢?
当前回答
我最近写了一个小类,它使用Bose-Nelson算法在编译时生成排序网络。
它可以用来为10个数字创建一个非常快速的排序。
/**
* A Functor class to create a sort for fixed sized arrays/containers with a
* compile time generated Bose-Nelson sorting network.
* \tparam NumElements The number of elements in the array or container to sort.
* \tparam T The element type.
* \tparam Compare A comparator functor class that returns true if lhs < rhs.
*/
template <unsigned NumElements, class Compare = void> class StaticSort
{
template <class A, class C> struct Swap
{
template <class T> inline void s(T &v0, T &v1)
{
T t = Compare()(v0, v1) ? v0 : v1; // Min
v1 = Compare()(v0, v1) ? v1 : v0; // Max
v0 = t;
}
inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
};
template <class A> struct Swap <A, void>
{
template <class T> inline void s(T &v0, T &v1)
{
// Explicitly code out the Min and Max to nudge the compiler
// to generate branchless code.
T t = v0 < v1 ? v0 : v1; // Min
v1 = v0 < v1 ? v1 : v0; // Max
v0 = t;
}
inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
};
template <class A, class C, int I, int J, int X, int Y> struct PB
{
inline PB(A &a)
{
enum { L = X >> 1, M = (X & 1 ? Y : Y + 1) >> 1, IAddL = I + L, XSubL = X - L };
PB<A, C, I, J, L, M> p0(a);
PB<A, C, IAddL, J + M, XSubL, Y - M> p1(a);
PB<A, C, IAddL, J, XSubL, M> p2(a);
}
};
template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 1>
{
inline PB(A &a) { Swap<A, C> s(a, I - 1, J - 1); }
};
template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 2>
{
inline PB(A &a) { Swap<A, C> s0(a, I - 1, J); Swap<A, C> s1(a, I - 1, J - 1); }
};
template <class A, class C, int I, int J> struct PB <A, C, I, J, 2, 1>
{
inline PB(A &a) { Swap<A, C> s0(a, I - 1, J - 1); Swap<A, C> s1(a, I, J - 1); }
};
template <class A, class C, int I, int M, bool Stop = false> struct PS
{
inline PS(A &a)
{
enum { L = M >> 1, IAddL = I + L, MSubL = M - L};
PS<A, C, I, L, (L <= 1)> ps0(a);
PS<A, C, IAddL, MSubL, (MSubL <= 1)> ps1(a);
PB<A, C, I, IAddL, L, MSubL> pb(a);
}
};
template <class A, class C, int I, int M> struct PS <A, C, I, M, true>
{
inline PS(A &a) {}
};
public:
/**
* Sorts the array/container arr.
* \param arr The array/container to be sorted.
*/
template <class Container> inline void operator() (Container &arr) const
{
PS<Container, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
};
/**
* Sorts the array arr.
* \param arr The array to be sorted.
*/
template <class T> inline void operator() (T *arr) const
{
PS<T*, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
};
};
#include <iostream>
#include <vector>
int main(int argc, const char * argv[])
{
enum { NumValues = 10 };
// Arrays
{
int rands[NumValues];
for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
std::cout << "Before Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
StaticSort<NumValues> staticSort;
staticSort(rands);
std::cout << "After Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
}
std::cout << "\n";
// STL Vector
{
std::vector<int> rands(NumValues);
for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
std::cout << "Before Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
StaticSort<NumValues> staticSort;
staticSort(rands);
std::cout << "After Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
}
return 0;
}
注意,我们没有使用if (compare)交换语句,而是显式地为min和max编写了三元运算符。这有助于推动编译器使用无分支代码。
# #基准
下面的基准测试是用clang -O3编译的,并在我2012年年中的MacBook Air上运行。
###排序随机数据
将它与DarioP的代码进行比较,下面是排序100万个大小为10的32位整型数组所需的毫秒数:
硬编码排序网络10:88.774毫秒 模板化玻色-纳尔逊排序10:27.815毫秒
使用这种模板化方法,我们还可以在编译时为其他数量的元素生成排序网络。
排序100万个不同大小的数组的时间(以毫秒为单位)。
大小为2、4、8的数组的毫秒数分别为1.943、8.655、20.246。
感谢格伦·泰特鲍姆的展开插入排序。
下面是6个元素的小数组每次排序的平均时钟。基准测试代码和示例可以在以下问题中找到:
最快的固定长度6 int数组排序
Direct call to qsort library function : 326.81
Naive implementation (insertion sort) : 132.98
Insertion Sort (Daniel Stutzbach) : 104.04
Insertion Sort Unrolled : 99.64
Insertion Sort Unrolled (Glenn Teitelbaum) : 81.55
Rank Order : 44.01
Rank Order with registers : 42.40
Sorting Networks (Daniel Stutzbach) : 88.06
Sorting Networks (Paul R) : 31.64
Sorting Networks 12 with Fast Swap : 29.68
Sorting Networks 12 reordered Swap : 28.61
Reordered Sorting Network w/ fast swap : 24.63
Templated Sorting Network (this class) : 25.37
对于6个元素,它的执行速度和问题中最快的例子一样快。
###排序数据的性能
通常,输入数组可能已经排序或基本排序。 在这种情况下,插入排序可能是更好的选择。
您可能希望根据数据选择合适的排序算法。
用于基准测试的代码可以在这里找到。
其他回答
既然可以移动,为什么要交换?一条x86高速缓存线有足够的额外内存供您执行归并排序。
我可能会分别插入排序索引0-1、2-4、5-6、7-9,或者更好的是在插入时保持这些小组的排序,这样每次插入最多需要一到两次移位。
然后合并5、6和7-9 -> 10-14,合并0-1和2-4 -> 5-9,最后合并5-9和10-14 -> 0-9
以下是运行在10个CUDA线程上的CUDA内核(秩排序算法),在42毫秒内对1000个数组排序1000次,每次排序42纳秒或每次排序~70个周期(1.7 GHz):
inline
__device__ int findOrder(const int mask, const int data0)
{
const int order1 = data0>__shfl_sync(mask,data0,0);
const int order2 = data0>__shfl_sync(mask,data0,1);
const int order3 = data0>__shfl_sync(mask,data0,2);
const int order4 = data0>__shfl_sync(mask,data0,3);
const int order5 = data0>__shfl_sync(mask,data0,4);
const int order6 = data0>__shfl_sync(mask,data0,5);
const int order7 = data0>__shfl_sync(mask,data0,6);
const int order8 = data0>__shfl_sync(mask,data0,7);
const int order9 = data0>__shfl_sync(mask,data0,8);
const int order10 = data0>__shfl_sync(mask,data0,9);
return order1 + order2 + order3 + order4 + order5 + order6 + order7 + order8 + order9 + order10;
}
// launch this with 10 CUDA threads in 1 block in 1 grid
// sorts 10 elements using only SIMT registers
__global__ void rankSort(int * __restrict__ buffer)
{
const int id = threadIdx.x;
// enable first 10 lanes of warp for shuffling
const int mask = __activemask();
__shared__ int data[10000];
for(int i=0;i<1000;i++)
{
data[id+i*10] = buffer[id+i*10];
}
__syncwarp();
// benchmark 1000 times
for(int k=0;k<1000;k++)
{
// sort 1000 arrays
for(int j=0;j<1000;j+=5)
{
// sort 5 arrays at once to hide latency
const int data1 = data[id+j*10];
const int data2 = data[id+(j+1)*10];
const int data3 = data[id+(j+2)*10];
const int data4 = data[id+(j+3)*10];
const int data5 = data[id+(j+4)*10];
const int order1 = findOrder(mask,data1);
const int order2 = findOrder(mask,data2);
const int order3 = findOrder(mask,data3);
const int order4 = findOrder(mask,data4);
const int order5 = findOrder(mask,data5);
data[order1+j*10]=data1;
data[order2+(j+1)*10]=data2;
data[order3+(j+2)*10]=data3;
data[order4+(j+3)*10]=data4;
data[order5+(j+4)*10]=data5;
}
}
__syncwarp();
for(int i=0;i<1000;i++)
{
buffer[id+i*10] = data[id+i*10];
}
}
由于所有10个线程都在同一个SIMT单元上处理,它类似于运行在矢量寄存器上的AVX512优化版本,但除了更多的寄存器空间以隐藏更多的延迟之外。此外,SIMT单元是32宽的,因此它可以运行线性时间复杂度直到32个元素。
该算法假设元素是唯一的。对于重复的数据,需要一个额外的缩减步骤来将重复的顺序值解压缩为10个元素。首先,它计算每个元素的秩,然后直接将它们复制到数组中作为索引的秩。顺序值需要蛮力(O(N x N))次比较,为了减少延迟,使用warp-shuffles从(向量寄存器的)不同的warp-lanes收集数据。
我最近写了一个小类,它使用Bose-Nelson算法在编译时生成排序网络。
它可以用来为10个数字创建一个非常快速的排序。
/**
* A Functor class to create a sort for fixed sized arrays/containers with a
* compile time generated Bose-Nelson sorting network.
* \tparam NumElements The number of elements in the array or container to sort.
* \tparam T The element type.
* \tparam Compare A comparator functor class that returns true if lhs < rhs.
*/
template <unsigned NumElements, class Compare = void> class StaticSort
{
template <class A, class C> struct Swap
{
template <class T> inline void s(T &v0, T &v1)
{
T t = Compare()(v0, v1) ? v0 : v1; // Min
v1 = Compare()(v0, v1) ? v1 : v0; // Max
v0 = t;
}
inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
};
template <class A> struct Swap <A, void>
{
template <class T> inline void s(T &v0, T &v1)
{
// Explicitly code out the Min and Max to nudge the compiler
// to generate branchless code.
T t = v0 < v1 ? v0 : v1; // Min
v1 = v0 < v1 ? v1 : v0; // Max
v0 = t;
}
inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
};
template <class A, class C, int I, int J, int X, int Y> struct PB
{
inline PB(A &a)
{
enum { L = X >> 1, M = (X & 1 ? Y : Y + 1) >> 1, IAddL = I + L, XSubL = X - L };
PB<A, C, I, J, L, M> p0(a);
PB<A, C, IAddL, J + M, XSubL, Y - M> p1(a);
PB<A, C, IAddL, J, XSubL, M> p2(a);
}
};
template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 1>
{
inline PB(A &a) { Swap<A, C> s(a, I - 1, J - 1); }
};
template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 2>
{
inline PB(A &a) { Swap<A, C> s0(a, I - 1, J); Swap<A, C> s1(a, I - 1, J - 1); }
};
template <class A, class C, int I, int J> struct PB <A, C, I, J, 2, 1>
{
inline PB(A &a) { Swap<A, C> s0(a, I - 1, J - 1); Swap<A, C> s1(a, I, J - 1); }
};
template <class A, class C, int I, int M, bool Stop = false> struct PS
{
inline PS(A &a)
{
enum { L = M >> 1, IAddL = I + L, MSubL = M - L};
PS<A, C, I, L, (L <= 1)> ps0(a);
PS<A, C, IAddL, MSubL, (MSubL <= 1)> ps1(a);
PB<A, C, I, IAddL, L, MSubL> pb(a);
}
};
template <class A, class C, int I, int M> struct PS <A, C, I, M, true>
{
inline PS(A &a) {}
};
public:
/**
* Sorts the array/container arr.
* \param arr The array/container to be sorted.
*/
template <class Container> inline void operator() (Container &arr) const
{
PS<Container, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
};
/**
* Sorts the array arr.
* \param arr The array to be sorted.
*/
template <class T> inline void operator() (T *arr) const
{
PS<T*, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
};
};
#include <iostream>
#include <vector>
int main(int argc, const char * argv[])
{
enum { NumValues = 10 };
// Arrays
{
int rands[NumValues];
for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
std::cout << "Before Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
StaticSort<NumValues> staticSort;
staticSort(rands);
std::cout << "After Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
}
std::cout << "\n";
// STL Vector
{
std::vector<int> rands(NumValues);
for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
std::cout << "Before Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
StaticSort<NumValues> staticSort;
staticSort(rands);
std::cout << "After Sort: \t";
for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
std::cout << "\n";
}
return 0;
}
注意,我们没有使用if (compare)交换语句,而是显式地为min和max编写了三元运算符。这有助于推动编译器使用无分支代码。
# #基准
下面的基准测试是用clang -O3编译的,并在我2012年年中的MacBook Air上运行。
###排序随机数据
将它与DarioP的代码进行比较,下面是排序100万个大小为10的32位整型数组所需的毫秒数:
硬编码排序网络10:88.774毫秒 模板化玻色-纳尔逊排序10:27.815毫秒
使用这种模板化方法,我们还可以在编译时为其他数量的元素生成排序网络。
排序100万个不同大小的数组的时间(以毫秒为单位)。
大小为2、4、8的数组的毫秒数分别为1.943、8.655、20.246。
感谢格伦·泰特鲍姆的展开插入排序。
下面是6个元素的小数组每次排序的平均时钟。基准测试代码和示例可以在以下问题中找到:
最快的固定长度6 int数组排序
Direct call to qsort library function : 326.81
Naive implementation (insertion sort) : 132.98
Insertion Sort (Daniel Stutzbach) : 104.04
Insertion Sort Unrolled : 99.64
Insertion Sort Unrolled (Glenn Teitelbaum) : 81.55
Rank Order : 44.01
Rank Order with registers : 42.40
Sorting Networks (Daniel Stutzbach) : 88.06
Sorting Networks (Paul R) : 31.64
Sorting Networks 12 with Fast Swap : 29.68
Sorting Networks 12 reordered Swap : 28.61
Reordered Sorting Network w/ fast swap : 24.63
Templated Sorting Network (this class) : 25.37
对于6个元素,它的执行速度和问题中最快的例子一样快。
###排序数据的性能
通常,输入数组可能已经排序或基本排序。 在这种情况下,插入排序可能是更好的选择。
您可能希望根据数据选择合适的排序算法。
用于基准测试的代码可以在这里找到。
当您处理这个固定大小时,请查看排序网络。这些算法有固定的运行时间,并且独立于它们的输入。对于您的用例,您没有某些排序算法所具有的这种开销。
二进制排序就是这种网络的一种实现。这个方法在CPU上使用len(n) <= 32时效果最好。对于更大的输入,你可以考虑使用GPU。
顺便说一下,比较排序算法的一个好页面是这个(尽管它缺少二进制排序):
排序算法动画
那么展开的、无分支的选择排序呢?
#include <iostream>
#include <algorithm>
#include <random>
//return the index of the minimum element in array a
int min(const int * const a) {
int m = a[0];
int indx = 0;
#define TEST(i) (m > a[i]) && (m = a[i], indx = i );
//see http://stackoverflow.com/a/7074042/2140449
TEST(1);
TEST(2);
TEST(3);
TEST(4);
TEST(5);
TEST(6);
TEST(7);
TEST(8);
TEST(9);
#undef TEST
return indx;
}
void sort( int * const a ){
int work[10];
int indx;
#define GET(i) indx = min(a); work[i] = a[indx]; a[indx] = 2147483647;
//get the minimum, copy it to work and set it at max_int in a
GET(0);
GET(1);
GET(2);
GET(3);
GET(4);
GET(5);
GET(6);
GET(7);
GET(8);
GET(9);
#undef GET
#define COPY(i) a[i] = work[i];
//copy back to a
COPY(0);
COPY(1);
COPY(2);
COPY(3);
COPY(4);
COPY(5);
COPY(6);
COPY(7);
COPY(8);
COPY(9);
#undef COPY
}
int main() {
//generating and printing a random array
int a[10] = { 1,2,3,4,5,6,7,8,9,10 };
std::random_device rd;
std::mt19937 g(rd());
std::shuffle( a, a+10, g);
for (int i = 0; i < 10; i++) {
std::cout << a[i] << ' ';
}
std::cout << std::endl;
//sorting and printing again
sort(a);
for (int i = 0; i < 10; i++) {
std::cout << a[i] << ' ';
}
return 0;
}
http://coliru.stacked-crooked.com/a/71e18bc4f7fa18c6
唯一相关的行是前两行#define。
它使用两个列表,并将第一个列表重新检查十次,这将是一个糟糕的选择排序,但它避免了分支和变长循环,这可能会补偿现代处理器和如此小的数据集。
基准
我对排序网络进行了基准测试,我的代码似乎变慢了。然而,我试图删除展开和复制。运行以下代码:
#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>
int min(const int * const a, int i) {
int m = a[i];
int indx = i++;
for ( ; i<10; i++)
//see http://stackoverflow.com/a/7074042/2140449
(m > a[i]) && (m = a[i], indx = i );
return indx;
}
void sort( int * const a ){
for (int i = 0; i<9; i++)
std::swap(a[i], a[min(a,i)]); //search only forward
}
void sortNet10(int * const data) { // ten-input sorting network by Waksman, 1969
int swap;
if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
}
std::chrono::duration<double> benchmark( void(*func)(int * const), const int seed ) {
std::mt19937 g(seed);
int a[10] = {10,11,12,13,14,15,16,17,18,19};
std::chrono::high_resolution_clock::time_point t1, t2;
t1 = std::chrono::high_resolution_clock::now();
for (long i = 0; i < 1e7; i++) {
std::shuffle( a, a+10, g);
func(a);
}
t2 = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
}
int main() {
std::random_device rd;
for (int i = 0; i < 10; i++) {
const int seed = rd();
std::cout << "seed = " << seed << std::endl;
std::cout << "sortNet10: " << benchmark(sortNet10, seed).count() << std::endl;
std::cout << "sort: " << benchmark(sort, seed).count() << std::endl;
}
return 0;
}
与排序网络相比,我一直在为无分支选择排序获得更好的结果。
$ gcc -v
gcc version 5.2.0 (GCC)
$ g++ -std=c++11 -Ofast sort.cpp && ./a.out
seed = -1727396418
sortNet10: 2.24137
sort: 2.21828
seed = 2003959850
sortNet10: 2.23914
sort: 2.21641
seed = 1994540383
sortNet10: 2.23782
sort: 2.21778
seed = 1258259982
sortNet10: 2.25199
sort: 2.21801
seed = 1821086932
sortNet10: 2.25535
sort: 2.2173
seed = 412262735
sortNet10: 2.24489
sort: 2.21776
seed = 1059795817
sortNet10: 2.29226
sort: 2.21777
seed = -188551272
sortNet10: 2.23803
sort: 2.22996
seed = 1043757247
sortNet10: 2.2503
sort: 2.23604
seed = -268332483
sortNet10: 2.24455
sort: 2.24304