我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。
目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。
我该如何处理这个问题呢?
我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。
目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。
我该如何处理这个问题呢?
当前回答
插入排序平均需要29,6次与排序10个输入的比较,最佳情况为9,最差情况为45(给定输入的顺序是相反的)。
{9,6,1} shellsort平均需要25.5次比较来排序10个输入。最好的情况是14次比较,最坏的情况是34次,对反向输入排序需要22次。
因此,使用shellsort而不是插入排序可以减少14%的平均情况。尽管最佳情况增加了56%,但最坏情况减少了24%,这对于控制最坏情况性能非常重要的应用程序来说意义重大。反之则减少51%。
因为你似乎对插入排序很熟悉,你可以将算法实现为{9,6}的排序网络,然后在那之后附加插入排序({1}):
i[0] with i[9] // {9}
i[0] with i[6] // {6}
i[1] with i[7] // {6}
i[2] with i[8] // {6}
i[3] with i[9] // {6}
i[0 ... 9] // insertion sort
其他回答
当您处理这个固定大小时,请查看排序网络。这些算法有固定的运行时间,并且独立于它们的输入。对于您的用例,您没有某些排序算法所具有的这种开销。
二进制排序就是这种网络的一种实现。这个方法在CPU上使用len(n) <= 32时效果最好。对于更大的输入,你可以考虑使用GPU。
顺便说一下,比较排序算法的一个好页面是这个(尽管它缺少二进制排序):
排序算法动画
(根据@HelloWorld的建议,研究排序网络。)
似乎29个比较/交换网络是进行10个输入排序的最快方法。在这个例子中,我使用了Waksman在1969年发现的JavaScript网络,它应该直接转换成C语言,因为它只是一个if语句、比较和交换的列表。
function sortNet10(data) { // ten-input sorting network by Waksman, 1969 var swap; if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; } if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; } if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; } if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; } if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; } if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; } if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; } if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; } if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; } if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; } if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; } if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; } if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; } if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; } if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; } if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; } if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; } if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; } if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; } if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; } if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; } if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; } if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; } if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; } if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; } if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; } if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; } if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; } if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; } return(data); } document.write(sortNet10([5,7,1,8,4,3,6,9,2,0]));
这里是网络的图形表示,分为独立的阶段。
为了利用并行处理的优势,可以将5-4-3-3 - 4-4-2 -3-2分组改为4-4-4-2 -4-4-3-2分组。
您可以完全展开插入排序。
为了简化这一点,可以在没有函数开销的情况下使用递归模板。因为int已经是模板,所以它也可以是模板形参。这也使得编码10以外的数组大小变得很简单。
注意,要排序int x[10],调用insert_sort<int, 9>::sort(x);因为类使用最后一项的索引。这可以被包装,但是需要阅读更多的代码。
template <class T, int NUM>
class insert_sort;
template <class T>
class insert_sort<T,0>
// Stop template recursion
// Sorting one item is a no operation
{
public:
static void place(T *x) {}
static void sort(T * x) {}
};
template <class T, int NUM>
class insert_sort
// Use template recursion to do insertion sort.
// NUM is the index of the last item, e.g. for x[10] call <9>
{
public:
static void place(T *x)
{
T t1=x[NUM-1];
T t2=x[NUM];
if (t1 > t2)
{
x[NUM-1]=t2;
x[NUM]=t1;
insert_sort<T,NUM-1>::place(x);
}
}
static void sort(T * x)
{
insert_sort<T,NUM-1>::sort(x); // Sort everything before
place(x); // Put this item in
}
};
在我的测试中,这比排序网络的例子要快。
以下是运行在10个CUDA线程上的CUDA内核(秩排序算法),在42毫秒内对1000个数组排序1000次,每次排序42纳秒或每次排序~70个周期(1.7 GHz):
inline
__device__ int findOrder(const int mask, const int data0)
{
const int order1 = data0>__shfl_sync(mask,data0,0);
const int order2 = data0>__shfl_sync(mask,data0,1);
const int order3 = data0>__shfl_sync(mask,data0,2);
const int order4 = data0>__shfl_sync(mask,data0,3);
const int order5 = data0>__shfl_sync(mask,data0,4);
const int order6 = data0>__shfl_sync(mask,data0,5);
const int order7 = data0>__shfl_sync(mask,data0,6);
const int order8 = data0>__shfl_sync(mask,data0,7);
const int order9 = data0>__shfl_sync(mask,data0,8);
const int order10 = data0>__shfl_sync(mask,data0,9);
return order1 + order2 + order3 + order4 + order5 + order6 + order7 + order8 + order9 + order10;
}
// launch this with 10 CUDA threads in 1 block in 1 grid
// sorts 10 elements using only SIMT registers
__global__ void rankSort(int * __restrict__ buffer)
{
const int id = threadIdx.x;
// enable first 10 lanes of warp for shuffling
const int mask = __activemask();
__shared__ int data[10000];
for(int i=0;i<1000;i++)
{
data[id+i*10] = buffer[id+i*10];
}
__syncwarp();
// benchmark 1000 times
for(int k=0;k<1000;k++)
{
// sort 1000 arrays
for(int j=0;j<1000;j+=5)
{
// sort 5 arrays at once to hide latency
const int data1 = data[id+j*10];
const int data2 = data[id+(j+1)*10];
const int data3 = data[id+(j+2)*10];
const int data4 = data[id+(j+3)*10];
const int data5 = data[id+(j+4)*10];
const int order1 = findOrder(mask,data1);
const int order2 = findOrder(mask,data2);
const int order3 = findOrder(mask,data3);
const int order4 = findOrder(mask,data4);
const int order5 = findOrder(mask,data5);
data[order1+j*10]=data1;
data[order2+(j+1)*10]=data2;
data[order3+(j+2)*10]=data3;
data[order4+(j+3)*10]=data4;
data[order5+(j+4)*10]=data5;
}
}
__syncwarp();
for(int i=0;i<1000;i++)
{
buffer[id+i*10] = data[id+i*10];
}
}
由于所有10个线程都在同一个SIMT单元上处理,它类似于运行在矢量寄存器上的AVX512优化版本,但除了更多的寄存器空间以隐藏更多的延迟之外。此外,SIMT单元是32宽的,因此它可以运行线性时间复杂度直到32个元素。
该算法假设元素是唯一的。对于重复的数据,需要一个额外的缩减步骤来将重复的顺序值解压缩为10个元素。首先,它计算每个元素的秩,然后直接将它们复制到数组中作为索引的秩。顺序值需要蛮力(O(N x N))次比较,为了减少延迟,使用warp-shuffles从(向量寄存器的)不同的warp-lanes收集数据。
使用排序网络,以4为一组进行比较,这样就可以在SIMD寄存器中进行比较。一对打包的最小/最大指令实现了打包比较器函数。对不起,我现在没有时间去找我记得看到过的关于这个的页面,但希望在SIMD或SSE排序网络上搜索会找到一些东西。
x86 SSE确实为4个32位整型的向量提供了打包的32位整型的min和max指令。AVX2 (Haswell及后续版本)具有相同的功能,但用于256b的8个整型向量。还有高效的洗牌指令。
如果你有很多独立的小排序,用向量并行地做4到8个排序是可能的。特别是,如果你随机选择元素(所以要排序的数据在内存中不会连续),你可以避免打乱,只需要按照你需要的顺序进行比较。10个寄存器保存来自4个(AVX2: 8) 10个整数列表的所有数据,仍然留下6个reg作为临时空间。
如果还需要对相关数据进行排序,则向量排序网络的效率较低。在这种情况下,最有效的方法似乎是使用wrapped -compare来获得更改元素的掩码,并使用该掩码来混合相关数据的(引用)向量。