一些关于我来自哪里的背景信息。代码片段在最后。
当我可以的时候,我更喜欢使用像H2O这样的开源工具来进行超高性能的并行CSV文件读取,但这个工具的功能集有限。最后,我写了很多代码来创建数据科学管道,然后才将其提供给H2O集群进行监督学习。
我一直在从UCI repo读取8GB的HIGGS数据集,甚至40GB的CSV文件,用于数据科学目的,通过使用多处理库的池对象和映射函数添加大量并行性,从而显著加快了读取速度。例如,使用最近邻搜索的聚类以及DBSCAN和Markov聚类算法需要一些并行编程技巧来绕过一些严重的内存和挂钟时间问题。
我通常喜欢先使用gnu工具将文件按行分解成各个部分,然后在python程序中并行地查找和读取它们。我通常使用1000多个部分文件。使用这些技巧对处理速度和内存限制有极大的帮助。
熊猫数据框架。Read_csv是单线程的,所以你可以通过运行map()并行执行来使pandas更快。您可以使用htop来查看普通的顺序熊猫数据框架。Read_csv,一个核上100%的CPU是pd的实际瓶颈。Read_csv,而不是磁盘。
我应该补充一句,我使用的是快速显卡总线上的SSD,而不是SATA6总线上的旋转HD,外加16个CPU内核。
Also, another technique that I discovered works great in some applications is parallel CSV file reads all within one giant file, starting each worker at different offset into the file, rather than pre-splitting one big file into many part files. Use python's file seek() and tell() in each parallel worker to read the big text file in strips, at different byte offset start-byte and end-byte locations in the big file, all at the same time concurrently. You can do a regex findall on the bytes, and return the count of linefeeds. This is a partial sum. Finally sum up the partial sums to get the global sum when the map function returns after the workers finished.
下面是一些使用并行字节偏移技巧的示例基准测试:
我使用了2个文件:HIGGS.csv是8gb。它来自UCI机器学习库。all_bin .csv是40.4 GB,来自我当前的项目。
我使用两个程序:Linux附带的GNU wc程序,以及我开发的纯python fastread.py程序。
HP-Z820:/mnt/fastssd/fast_file_reader$ ls -l /mnt/fastssd/nzv/HIGGS.csv
-rw-rw-r-- 1 8035497980 Jan 24 16:00 /mnt/fastssd/nzv/HIGGS.csv
HP-Z820:/mnt/fastssd$ ls -l all_bin.csv
-rw-rw-r-- 1 40412077758 Feb 2 09:00 all_bin.csv
ga@ga-HP-Z820:/mnt/fastssd$ time python fastread.py --fileName="all_bin.csv" --numProcesses=32 --balanceFactor=2
2367496
real 0m8.920s
user 1m30.056s
sys 2m38.744s
In [1]: 40412077758. / 8.92
Out[1]: 4530501990.807175
这大约是4.5 GB/s,或者45 GB/s的文件读取速度。那不是旋转硬盘,我的朋友。那其实是三星Pro 950 SSD。
下面是gnu wc(一个纯C编译程序)对同一文件进行行计数的速度基准。
What is cool is you can see my pure python program essentially matched the speed of the gnu wc compiled C program in this case. Python is interpreted but C is compiled, so this is a pretty interesting feat of speed, I think you would agree. Of course, wc really needs to be changed to a parallel program, and then it would really beat the socks off my python program. But as it stands today, gnu wc is just a sequential program. You do what you can, and python can do parallel today. Cython compiling might be able to help me (for some other time). Also memory mapped files was not explored yet.
HP-Z820:/mnt/fastssd$ time wc -l all_bin.csv
2367496 all_bin.csv
real 0m8.807s
user 0m1.168s
sys 0m7.636s
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000
real 0m2.257s
user 0m12.088s
sys 0m20.512s
HP-Z820:/mnt/fastssd/fast_file_reader$ time wc -l HIGGS.csv
11000000 HIGGS.csv
real 0m1.820s
user 0m0.364s
sys 0m1.456s
结论:与C程序相比,纯python程序的速度更好。但是,在C程序上使用纯python程序还不够好,至少在行计数的目的上是这样。一般来说,该技术可以用于其他文件处理,所以这段python代码仍然很好。
问:只编译一次正则表达式并将其传递给所有工作人员是否会提高速度?答:Regex预编译在这个应用程序中没有帮助。我认为原因是进程序列化和为所有worker创建的开销占主导地位。
还有一件事。
并行CSV文件读取甚至有帮助吗?磁盘是瓶颈,还是CPU?他们说,stackoverflow上许多所谓的顶级答案都包含了常见的开发智慧,即您只需要一个线程就可以读取一个文件,这是您所能做到的最好的。他们确定吗?
让我们来看看:
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000
real 0m2.256s
user 0m10.696s
sys 0m19.952s
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=1
11000000
real 0m17.380s
user 0m11.124s
sys 0m6.272s
哦,是的,是的。并行文件读取工作得很好。好吧,这就对了!
Ps.如果你们中的一些人想知道,当使用单个工作进程时,如果balanceFactor是2会怎样?嗯,这太可怕了:
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=2
11000000
real 1m37.077s
user 0m12.432s
sys 1m24.700s
fastread.py python程序的关键部分:
fileBytes = stat(fileName).st_size # Read quickly from OS how many bytes are in a text file
startByte, endByte = PartitionDataToWorkers(workers=numProcesses, items=fileBytes, balanceFactor=balanceFactor)
p = Pool(numProcesses)
partialSum = p.starmap(ReadFileSegment, zip(startByte, endByte, repeat(fileName))) # startByte is already a list. fileName is made into a same-length list of duplicates values.
globalSum = sum(partialSum)
print(globalSum)
def ReadFileSegment(startByte, endByte, fileName, searchChar='\n'): # counts number of searchChar appearing in the byte range
with open(fileName, 'r') as f:
f.seek(startByte-1) # seek is initially at byte 0 and then moves forward the specified amount, so seek(5) points at the 6th byte.
bytes = f.read(endByte - startByte + 1)
cnt = len(re.findall(searchChar, bytes)) # findall with implicit compiling runs just as fast here as re.compile once + re.finditer many times.
return cnt
PartitionDataToWorkers的def只是普通的顺序代码。我省略了它,以防其他人想要练习并行编程是什么样的。我免费提供了比较困难的部分:经过测试和工作的并行代码,以帮助您学习。
感谢:开源的H2O项目,由Arno和Cliff以及H2O工作人员为他们伟大的软件和教学视频,这为我提供了这个纯python高性能并行字节偏移阅读器的灵感,如上所示。H2O使用java进行并行文件读取,可由python和R程序调用,并且速度非常快,比地球上任何读取大型CSV文件的程序都快。