我正在解决一个问题,它涉及到快速排序10个数字(int32)。我的应用程序需要以尽可能快的速度对10个数字进行百万次排序。我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。

目前我正在使用插入排序,但我想我可以为我的10个数字的特定问题实现一个非常快速的自定义排序算法,这将击败插入排序。

我该如何处理这个问题呢?


当前回答

这个问题并没有说这是某种基于web的应用程序。有一件事引起了我的注意:

我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。

As a software and hardware engineer this absolutely screams FPGA to me. I don't know what kind of conclusions you need to draw from the sorted set of numbers or where the data comes from, but I know it would be almost trivial to process somewhere between one hundred million and a billion of these "sort-and-analyze" operations per second. I've done FPGA-assisted DNA sequencing work in the past. It is nearly impossible to beat the massive processing power of FPGAs when the problem is well suited for that type of a solution.

在某种程度上,唯一的限制因素是将数据铲入FPGA的速度有多快,以及取出数据的速度有多快。

As a point of reference, I designed a high performance real-time image processor that received 32 bit RGB image data at a rate of about 300 million pixels per second. The data streamed through FIR filters, matrix multipliers, lookup tables, spatial edge detection blocks and a number of other operations before coming out the other end. All of this on a relatively small Xilinx Virtex2 FPGA with internal clocking spanning from about 33 MHz to, if I remember correctly, 400 MHz. Oh, yes, it also had a DDR2 controller implementation and ran two banks of DDR2 memory.

当工作在数百MHz时,FPGA可以在每次时钟转换中输出10个32位数字。当数据填满处理管道时,操作开始时会有短暂的延迟。在此之后,您应该能够在每个时钟获得一个结果。如果可以通过复制排序和分析管道使处理并行化,则会更多。原则上,解决方案几乎是微不足道的。

关键在于:如果应用程序不是pc绑定的,并且数据流和处理与FPGA解决方案“兼容”(无论是独立的还是作为机器中的协处理器卡),那么无论使用哪种算法,用任何语言编写的软件都无法击败可达到的性能水平。

我只是快速搜索了一下,找到了一篇可能对你有用的论文。看起来要追溯到2012年。在今天(甚至在过去),您可以在性能上做得更好。下面就是:

fpga上的排序网络

其他回答

这个问题并没有说这是某种基于web的应用程序。有一件事引起了我的注意:

我正在对一个包含数十亿个元素的数据集进行采样,每次我都需要从中挑选10个数字(简化)并对它们进行排序(并从排序的10个元素列表中得出结论)。

As a software and hardware engineer this absolutely screams FPGA to me. I don't know what kind of conclusions you need to draw from the sorted set of numbers or where the data comes from, but I know it would be almost trivial to process somewhere between one hundred million and a billion of these "sort-and-analyze" operations per second. I've done FPGA-assisted DNA sequencing work in the past. It is nearly impossible to beat the massive processing power of FPGAs when the problem is well suited for that type of a solution.

在某种程度上,唯一的限制因素是将数据铲入FPGA的速度有多快,以及取出数据的速度有多快。

As a point of reference, I designed a high performance real-time image processor that received 32 bit RGB image data at a rate of about 300 million pixels per second. The data streamed through FIR filters, matrix multipliers, lookup tables, spatial edge detection blocks and a number of other operations before coming out the other end. All of this on a relatively small Xilinx Virtex2 FPGA with internal clocking spanning from about 33 MHz to, if I remember correctly, 400 MHz. Oh, yes, it also had a DDR2 controller implementation and ran two banks of DDR2 memory.

当工作在数百MHz时,FPGA可以在每次时钟转换中输出10个32位数字。当数据填满处理管道时,操作开始时会有短暂的延迟。在此之后,您应该能够在每个时钟获得一个结果。如果可以通过复制排序和分析管道使处理并行化,则会更多。原则上,解决方案几乎是微不足道的。

关键在于:如果应用程序不是pc绑定的,并且数据流和处理与FPGA解决方案“兼容”(无论是独立的还是作为机器中的协处理器卡),那么无论使用哪种算法,用任何语言编写的软件都无法击败可达到的性能水平。

我只是快速搜索了一下,找到了一篇可能对你有用的论文。看起来要追溯到2012年。在今天(甚至在过去),您可以在性能上做得更好。下面就是:

fpga上的排序网络

插入排序平均需要29,6次与排序10个输入的比较,最佳情况为9,最差情况为45(给定输入的顺序是相反的)。

{9,6,1} shellsort平均需要25.5次比较来排序10个输入。最好的情况是14次比较,最坏的情况是34次,对反向输入排序需要22次。

因此,使用shellsort而不是插入排序可以减少14%的平均情况。尽管最佳情况增加了56%,但最坏情况减少了24%,这对于控制最坏情况性能非常重要的应用程序来说意义重大。反之则减少51%。

因为你似乎对插入排序很熟悉,你可以将算法实现为{9,6}的排序网络,然后在那之后附加插入排序({1}):

i[0] with i[9]    // {9}

i[0] with i[6]    // {6}
i[1] with i[7]    // {6}
i[2] with i[8]    // {6}
i[3] with i[9]    // {6}

i[0 ... 9]        // insertion sort

当您处理这个固定大小时,请查看排序网络。这些算法有固定的运行时间,并且独立于它们的输入。对于您的用例,您没有某些排序算法所具有的这种开销。

二进制排序就是这种网络的一种实现。这个方法在CPU上使用len(n) <= 32时效果最好。对于更大的输入,你可以考虑使用GPU。

顺便说一下,比较排序算法的一个好页面是这个(尽管它缺少二进制排序):

排序算法动画

(根据@HelloWorld的建议,研究排序网络。)

似乎29个比较/交换网络是进行10个输入排序的最快方法。在这个例子中,我使用了Waksman在1969年发现的JavaScript网络,它应该直接转换成C语言,因为它只是一个if语句、比较和交换的列表。

function sortNet10(data) { // ten-input sorting network by Waksman, 1969 var swap; if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; } if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; } if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; } if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; } if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; } if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; } if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; } if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; } if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; } if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; } if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; } if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; } if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; } if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; } if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; } if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; } if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; } if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; } if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; } if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; } if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; } if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; } if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; } if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; } if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; } if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; } if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; } if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; } if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; } return(data); } document.write(sortNet10([5,7,1,8,4,3,6,9,2,0]));

这里是网络的图形表示,分为独立的阶段。

为了利用并行处理的优势,可以将5-4-3-3 - 4-4-2 -3-2分组改为4-4-4-2 -4-4-3-2分组。

我最近写了一个小类,它使用Bose-Nelson算法在编译时生成排序网络。

它可以用来为10个数字创建一个非常快速的排序。

/**
 * A Functor class to create a sort for fixed sized arrays/containers with a
 * compile time generated Bose-Nelson sorting network.
 * \tparam NumElements  The number of elements in the array or container to sort.
 * \tparam T            The element type.
 * \tparam Compare      A comparator functor class that returns true if lhs < rhs.
 */
template <unsigned NumElements, class Compare = void> class StaticSort
{
    template <class A, class C> struct Swap
    {
        template <class T> inline void s(T &v0, T &v1)
        {
            T t = Compare()(v0, v1) ? v0 : v1; // Min
            v1 = Compare()(v0, v1) ? v1 : v0; // Max
            v0 = t;
        }

        inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
    };

    template <class A> struct Swap <A, void>
    {
        template <class T> inline void s(T &v0, T &v1)
        {
            // Explicitly code out the Min and Max to nudge the compiler
            // to generate branchless code.
            T t = v0 < v1 ? v0 : v1; // Min
            v1 = v0 < v1 ? v1 : v0; // Max
            v0 = t;
        }

        inline Swap(A &a, const int &i0, const int &i1) { s(a[i0], a[i1]); }
    };

    template <class A, class C, int I, int J, int X, int Y> struct PB
    {
        inline PB(A &a)
        {
            enum { L = X >> 1, M = (X & 1 ? Y : Y + 1) >> 1, IAddL = I + L, XSubL = X - L };
            PB<A, C, I, J, L, M> p0(a);
            PB<A, C, IAddL, J + M, XSubL, Y - M> p1(a);
            PB<A, C, IAddL, J, XSubL, M> p2(a);
        }
    };

    template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 1>
    {
        inline PB(A &a) { Swap<A, C> s(a, I - 1, J - 1); }
    };

    template <class A, class C, int I, int J> struct PB <A, C, I, J, 1, 2>
    {
        inline PB(A &a) { Swap<A, C> s0(a, I - 1, J); Swap<A, C> s1(a, I - 1, J - 1); }
    };

    template <class A, class C, int I, int J> struct PB <A, C, I, J, 2, 1>
    {
        inline PB(A &a) { Swap<A, C> s0(a, I - 1, J - 1); Swap<A, C> s1(a, I, J - 1); }
    };

    template <class A, class C, int I, int M, bool Stop = false> struct PS
    {
        inline PS(A &a)
        {
            enum { L = M >> 1, IAddL = I + L, MSubL = M - L};
            PS<A, C, I, L, (L <= 1)> ps0(a);
            PS<A, C, IAddL, MSubL, (MSubL <= 1)> ps1(a);
            PB<A, C, I, IAddL, L, MSubL> pb(a);
        }
    };

    template <class A, class C, int I, int M> struct PS <A, C, I, M, true>
    {
        inline PS(A &a) {}
    };

public:
    /**
     * Sorts the array/container arr.
     * \param  arr  The array/container to be sorted.
     */
    template <class Container> inline void operator() (Container &arr) const
    {
        PS<Container, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
    };

    /**
     * Sorts the array arr.
     * \param  arr  The array to be sorted.
     */
    template <class T> inline void operator() (T *arr) const
    {
        PS<T*, Compare, 1, NumElements, (NumElements <= 1)> ps(arr);
    };
};

#include <iostream>
#include <vector>

int main(int argc, const char * argv[])
{
    enum { NumValues = 10 };

    // Arrays
    {
        int rands[NumValues];
        for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
        std::cout << "Before Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
        StaticSort<NumValues> staticSort;
        staticSort(rands);
        std::cout << "After Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
    }

    std::cout << "\n";

    // STL Vector
    {
        std::vector<int> rands(NumValues);
        for (int i = 0; i < NumValues; ++i) rands[i] = rand() % 100;
        std::cout << "Before Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
        StaticSort<NumValues> staticSort;
        staticSort(rands);
        std::cout << "After Sort: \t";
        for (int i = 0; i < NumValues; ++i) std::cout << rands[i] << " ";
        std::cout << "\n";
    }

    return 0;
}

注意,我们没有使用if (compare)交换语句,而是显式地为min和max编写了三元运算符。这有助于推动编译器使用无分支代码。

# #基准

下面的基准测试是用clang -O3编译的,并在我2012年年中的MacBook Air上运行。

###排序随机数据

将它与DarioP的代码进行比较,下面是排序100万个大小为10的32位整型数组所需的毫秒数:

硬编码排序网络10:88.774毫秒 模板化玻色-纳尔逊排序10:27.815毫秒

使用这种模板化方法,我们还可以在编译时为其他数量的元素生成排序网络。

排序100万个不同大小的数组的时间(以毫秒为单位)。

大小为2、4、8的数组的毫秒数分别为1.943、8.655、20.246。

感谢格伦·泰特鲍姆的展开插入排序。

下面是6个元素的小数组每次排序的平均时钟。基准测试代码和示例可以在以下问题中找到:

最快的固定长度6 int数组排序

Direct call to qsort library function       : 326.81
Naive implementation (insertion sort)       : 132.98
Insertion Sort (Daniel Stutzbach)           : 104.04
Insertion Sort Unrolled                     : 99.64
Insertion Sort Unrolled (Glenn Teitelbaum)  : 81.55
Rank Order                                  : 44.01
Rank Order with registers                   : 42.40
Sorting Networks (Daniel Stutzbach)         : 88.06
Sorting Networks (Paul R)                   : 31.64
Sorting Networks 12 with Fast Swap          : 29.68
Sorting Networks 12 reordered Swap          : 28.61
Reordered Sorting Network w/ fast swap      : 24.63
Templated Sorting Network (this class)      : 25.37

对于6个元素,它的执行速度和问题中最快的例子一样快。

###排序数据的性能

通常,输入数组可能已经排序或基本排序。 在这种情况下,插入排序可能是更好的选择。

您可能希望根据数据选择合适的排序算法。

用于基准测试的代码可以在这里找到。