我怎样才能做得快呢?
当然我可以这样做:
static bool ByteArrayCompare(byte[] a1, byte[] a2)
{
if (a1.Length != a2.Length)
return false;
for (int i=0; i<a1.Length; i++)
if (a1[i]!=a2[i])
return false;
return true;
}
但我正在寻找一个BCL函数或一些高度优化的已证明的方法来做到这一点。
java.util.Arrays.equals((sbyte[])(Array)a1, (sbyte[])(Array)a2);
工作得很好,但这似乎不适用于x64。
注意我的快速回答。
我开发了一个方法,稍微击败memcmp() (plinth的答案)和非常轻微击败EqualBytesLongUnrolled() (Arek Bulski的答案)在我的PC上。基本上,它以4而不是8展开循环。
2019年3月30日更新:
从。net核心3.0开始,我们有了SIMD支持!
这个解决方案在我的PC上是最快的:
#if NETCOREAPP3_0
using System.Runtime.Intrinsics.X86;
#endif
…
public static unsafe bool Compare(byte[] arr0, byte[] arr1)
{
if (arr0 == arr1)
{
return true;
}
if (arr0 == null || arr1 == null)
{
return false;
}
if (arr0.Length != arr1.Length)
{
return false;
}
if (arr0.Length == 0)
{
return true;
}
fixed (byte* b0 = arr0, b1 = arr1)
{
#if NETCOREAPP3_0
if (Avx2.IsSupported)
{
return Compare256(b0, b1, arr0.Length);
}
else if (Sse2.IsSupported)
{
return Compare128(b0, b1, arr0.Length);
}
else
#endif
{
return Compare64(b0, b1, arr0.Length);
}
}
}
#if NETCOREAPP3_0
public static unsafe bool Compare256(byte* b0, byte* b1, int length)
{
byte* lastAddr = b0 + length;
byte* lastAddrMinus128 = lastAddr - 128;
const int mask = -1;
while (b0 < lastAddrMinus128) // unroll the loop so that we are comparing 128 bytes at a time.
{
if (Avx2.MoveMask(Avx2.CompareEqual(Avx.LoadVector256(b0), Avx.LoadVector256(b1))) != mask)
{
return false;
}
if (Avx2.MoveMask(Avx2.CompareEqual(Avx.LoadVector256(b0 + 32), Avx.LoadVector256(b1 + 32))) != mask)
{
return false;
}
if (Avx2.MoveMask(Avx2.CompareEqual(Avx.LoadVector256(b0 + 64), Avx.LoadVector256(b1 + 64))) != mask)
{
return false;
}
if (Avx2.MoveMask(Avx2.CompareEqual(Avx.LoadVector256(b0 + 96), Avx.LoadVector256(b1 + 96))) != mask)
{
return false;
}
b0 += 128;
b1 += 128;
}
while (b0 < lastAddr)
{
if (*b0 != *b1) return false;
b0++;
b1++;
}
return true;
}
public static unsafe bool Compare128(byte* b0, byte* b1, int length)
{
byte* lastAddr = b0 + length;
byte* lastAddrMinus64 = lastAddr - 64;
const int mask = 0xFFFF;
while (b0 < lastAddrMinus64) // unroll the loop so that we are comparing 64 bytes at a time.
{
if (Sse2.MoveMask(Sse2.CompareEqual(Sse2.LoadVector128(b0), Sse2.LoadVector128(b1))) != mask)
{
return false;
}
if (Sse2.MoveMask(Sse2.CompareEqual(Sse2.LoadVector128(b0 + 16), Sse2.LoadVector128(b1 + 16))) != mask)
{
return false;
}
if (Sse2.MoveMask(Sse2.CompareEqual(Sse2.LoadVector128(b0 + 32), Sse2.LoadVector128(b1 + 32))) != mask)
{
return false;
}
if (Sse2.MoveMask(Sse2.CompareEqual(Sse2.LoadVector128(b0 + 48), Sse2.LoadVector128(b1 + 48))) != mask)
{
return false;
}
b0 += 64;
b1 += 64;
}
while (b0 < lastAddr)
{
if (*b0 != *b1) return false;
b0++;
b1++;
}
return true;
}
#endif
public static unsafe bool Compare64(byte* b0, byte* b1, int length)
{
byte* lastAddr = b0 + length;
byte* lastAddrMinus32 = lastAddr - 32;
while (b0 < lastAddrMinus32) // unroll the loop so that we are comparing 32 bytes at a time.
{
if (*(ulong*)b0 != *(ulong*)b1) return false;
if (*(ulong*)(b0 + 8) != *(ulong*)(b1 + 8)) return false;
if (*(ulong*)(b0 + 16) != *(ulong*)(b1 + 16)) return false;
if (*(ulong*)(b0 + 24) != *(ulong*)(b1 + 24)) return false;
b0 += 32;
b1 += 32;
}
while (b0 < lastAddr)
{
if (*b0 != *b1) return false;
b0++;
b1++;
}
return true;
}
受到ArekBulski发布的EqualBytesLongUnrolled方法的启发,我确定了一个附加优化的解决方案。在我的实例中,数组中的数组差异往往在数组的尾部附近。在测试中,我发现当这种情况发生在大型数组中时,能够以相反的顺序比较数组元素使这种解决方案比基于memcmp的解决方案获得了巨大的性能提升。下面是解决方案:
public enum CompareDirection { Forward, Backward }
private static unsafe bool UnsafeEquals(byte[] a, byte[] b, CompareDirection direction = CompareDirection.Forward)
{
// returns when a and b are same array or both null
if (a == b) return true;
// if either is null or different lengths, can't be equal
if (a == null || b == null || a.Length != b.Length)
return false;
const int UNROLLED = 16; // count of longs 'unrolled' in optimization
int size = sizeof(long) * UNROLLED; // 128 bytes (min size for 'unrolled' optimization)
int len = a.Length;
int n = len / size; // count of full 128 byte segments
int r = len % size; // count of remaining 'unoptimized' bytes
// pin the arrays and access them via pointers
fixed (byte* pb_a = a, pb_b = b)
{
if (r > 0 && direction == CompareDirection.Backward)
{
byte* pa = pb_a + len - 1;
byte* pb = pb_b + len - 1;
byte* phead = pb_a + len - r;
while(pa >= phead)
{
if (*pa != *pb) return false;
pa--;
pb--;
}
}
if (n > 0)
{
int nOffset = n * size;
if (direction == CompareDirection.Forward)
{
long* pa = (long*)pb_a;
long* pb = (long*)pb_b;
long* ptail = (long*)(pb_a + nOffset);
while (pa < ptail)
{
if (*(pa + 0) != *(pb + 0) || *(pa + 1) != *(pb + 1) ||
*(pa + 2) != *(pb + 2) || *(pa + 3) != *(pb + 3) ||
*(pa + 4) != *(pb + 4) || *(pa + 5) != *(pb + 5) ||
*(pa + 6) != *(pb + 6) || *(pa + 7) != *(pb + 7) ||
*(pa + 8) != *(pb + 8) || *(pa + 9) != *(pb + 9) ||
*(pa + 10) != *(pb + 10) || *(pa + 11) != *(pb + 11) ||
*(pa + 12) != *(pb + 12) || *(pa + 13) != *(pb + 13) ||
*(pa + 14) != *(pb + 14) || *(pa + 15) != *(pb + 15)
)
{
return false;
}
pa += UNROLLED;
pb += UNROLLED;
}
}
else
{
long* pa = (long*)(pb_a + nOffset);
long* pb = (long*)(pb_b + nOffset);
long* phead = (long*)pb_a;
while (phead < pa)
{
if (*(pa - 1) != *(pb - 1) || *(pa - 2) != *(pb - 2) ||
*(pa - 3) != *(pb - 3) || *(pa - 4) != *(pb - 4) ||
*(pa - 5) != *(pb - 5) || *(pa - 6) != *(pb - 6) ||
*(pa - 7) != *(pb - 7) || *(pa - 8) != *(pb - 8) ||
*(pa - 9) != *(pb - 9) || *(pa - 10) != *(pb - 10) ||
*(pa - 11) != *(pb - 11) || *(pa - 12) != *(pb - 12) ||
*(pa - 13) != *(pb - 13) || *(pa - 14) != *(pb - 14) ||
*(pa - 15) != *(pb - 15) || *(pa - 16) != *(pb - 16)
)
{
return false;
}
pa -= UNROLLED;
pb -= UNROLLED;
}
}
}
if (r > 0 && direction == CompareDirection.Forward)
{
byte* pa = pb_a + len - r;
byte* pb = pb_b + len - r;
byte* ptail = pb_a + len;
while(pa < ptail)
{
if (*pa != *pb) return false;
pa++;
pb++;
}
}
}
return true;
}
我想到了许多显卡内置的块传输加速方法。但是这样你就必须按字节复制所有的数据,所以如果你不想在非托管和依赖硬件的代码中实现你的整个逻辑,这对你没有多大帮助……
Another way of optimization similar to the approach shown above would be to store as much of your data as possible in a long[] rather than a byte[] right from the start, for example if you are reading it sequentially from a binary file, or if you use a memory mapped file, read in data as long[] or single long values. Then, your comparison loop will only need 1/8th of the number of iterations it would have to do for a byte[] containing the same amount of data.
It is a matter of when and how often you need to compare vs. when and how often you need to access the data in a byte-by-byte manner, e.g. to use it in an API call as a parameter in a method that expects a byte[]. In the end, you only can tell if you really know the use case...