如何以最有效的内存和时间方式获取大文件的行数?
def file_len(filename):
with open(filename) as f:
for i, _ in enumerate(f):
pass
return i + 1
如何以最有效的内存和时间方式获取大文件的行数?
def file_len(filename):
with open(filename) as f:
for i, _ in enumerate(f):
pass
return i + 1
您可以执行子进程并运行wc -l filename
import subprocess
def file_len(fname):
p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result, err = p.communicate()
if p.returncode != 0:
raise IOError(err)
return int(result.strip().split()[0])
def file_len(full_path):
""" Count number of lines in a file."""
f = open(full_path)
nr_of_lines = sum(1 for line in f)
f.close()
return nr_of_lines
没有比这更好的了。
毕竟,任何解决方案都必须读取整个文件,计算出有多少\n,并返回结果。
在不读取整个文件的情况下,你有更好的方法吗?不确定……最好的解决方案总是I/ o受限,你能做的最好的就是确保不使用不必要的内存,但看起来你已经覆盖了这个问题。
对我来说,这个变体是最快的:
#!/usr/bin/env python
def main():
f = open('filename')
lines = 0
buf_size = 1024 * 1024
read_f = f.read # loop optimization
buf = read_f(buf_size)
while buf:
lines += buf.count('\n')
buf = read_f(buf_size)
print lines
if __name__ == '__main__':
main()
原因:缓冲比逐行和逐字符串读取快。计数也非常快
打开一个文件的结果是一个迭代器,它可以转换为一个序列,它有一个长度:
with open(filename) as f:
return len(list(f))
这比显式循环更简洁,并避免了枚举。
这个呢
def file_len(fname):
counts = itertools.count()
with open(fname) as f:
for _ in f: counts.next()
return counts.next()
我相信内存映射文件将是最快的解决方案。我尝试了四个函数:由OP发布的函数(opcount);对文件中的行进行简单迭代(simplecount);带有内存映射字段(mmap)的Readline (mapcount);以及Mykola Kharechko (buffcount)提供的缓冲区读取解决方案。
我将每个函数运行五次,并计算出120万在线文本文件的平均运行时间。
Windows XP, Python 2.5, 2GB RAM, 2ghz AMD处理器
以下是我的结果:
mapcount : 0.465599966049
simplecount : 0.756399965286
bufcount : 0.546800041199
opcount : 0.718600034714
编辑:Python 2.6的数字:
mapcount : 0.471799945831
simplecount : 0.634400033951
bufcount : 0.468800067902
opcount : 0.602999973297
因此,对于Windows/Python 2.6,缓冲区读取策略似乎是最快的
代码如下:
from __future__ import with_statement
import time
import mmap
import random
from collections import defaultdict
def mapcount(filename):
f = open(filename, "r+")
buf = mmap.mmap(f.fileno(), 0)
lines = 0
readline = buf.readline
while readline():
lines += 1
return lines
def simplecount(filename):
lines = 0
for line in open(filename):
lines += 1
return lines
def bufcount(filename):
f = open(filename)
lines = 0
buf_size = 1024 * 1024
read_f = f.read # loop optimization
buf = read_f(buf_size)
while buf:
lines += buf.count('\n')
buf = read_f(buf_size)
return lines
def opcount(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
counts = defaultdict(list)
for i in range(5):
for func in [mapcount, simplecount, bufcount, opcount]:
start_time = time.time()
assert func("big_file.txt") == 1209138
counts[func].append(time.time() - start_time)
for key, vals in counts.items():
print key.__name__, ":", sum(vals) / float(len(vals))
为了完成上述方法,我尝试了fileinput模块的一个变体:
import fileinput as fi
def filecount(fname):
for line in fi.input(fname):
pass
return fi.lineno()
并将一个60mil行文件传递给上述所有方法:
mapcount : 6.1331050396
simplecount : 4.588793993
opcount : 4.42918205261
filecount : 43.2780818939
bufcount : 0.170812129974
这让我有点惊讶,fileinput是如此糟糕,比所有其他方法都要糟糕得多…
这个呢?
import sys
sys.stdin=open('fname','r')
data=sys.stdin.readlines()
print "counted",len(data),"lines"
为什么下面的方法行不通呢?
import sys
# input comes from STDIN
file = sys.stdin
data = file.readlines()
# get total number of lines in file
lines = len(data)
print lines
在这种情况下,len函数使用输入行作为确定长度的方法。
这个怎么样?
import fileinput
import sys
counter=0
for line in fileinput.input([sys.argv[1]]):
counter+=1
fileinput.close()
print counter
下面是一个python程序,使用多处理库将行计数分布到不同的机器/核。使用8核windows 64服务器,我的测试将一个2000万行文件的计数从26秒提高到7秒。注意:不使用内存映射会使运行速度变慢。
import multiprocessing, sys, time, os, mmap
import logging, logging.handlers
def init_logger(pid):
console_format = 'P{0} %(levelname)s %(message)s'.format(pid)
logger = logging.getLogger() # New logger at root level
logger.setLevel( logging.INFO )
logger.handlers.append( logging.StreamHandler() )
logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) )
def getFileLineCount( queues, pid, processes, file1 ):
init_logger(pid)
logging.info( 'start' )
physical_file = open(file1, "r")
# mmap.mmap(fileno, length[, tagname[, access[, offset]]]
m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ )
#work out file size to divide up line counting
fSize = os.stat(file1).st_size
chunk = (fSize / processes) + 1
lines = 0
#get where I start and stop
_seedStart = chunk * (pid)
_seekEnd = chunk * (pid+1)
seekStart = int(_seedStart)
seekEnd = int(_seekEnd)
if seekEnd < int(_seekEnd + 1):
seekEnd += 1
if _seedStart < int(seekStart + 1):
seekStart += 1
if seekEnd > fSize:
seekEnd = fSize
#find where to start
if pid > 0:
m1.seek( seekStart )
#read next line
l1 = m1.readline() # need to use readline with memory mapped files
seekStart = m1.tell()
#tell previous rank my seek start to make their seek end
if pid > 0:
queues[pid-1].put( seekStart )
if pid < processes-1:
seekEnd = queues[pid].get()
m1.seek( seekStart )
l1 = m1.readline()
while len(l1) > 0:
lines += 1
l1 = m1.readline()
if m1.tell() > seekEnd or len(l1) == 0:
break
logging.info( 'done' )
# add up the results
if pid == 0:
for p in range(1,processes):
lines += queues[0].get()
queues[0].put(lines) # the total lines counted
else:
queues[0].put(lines)
m1.close()
physical_file.close()
if __name__ == '__main__':
init_logger( 'main' )
if len(sys.argv) > 1:
file_name = sys.argv[1]
else:
logging.fatal( 'parameters required: file-name [processes]' )
exit()
t = time.time()
processes = multiprocessing.cpu_count()
if len(sys.argv) > 2:
processes = int(sys.argv[2])
queues=[] # a queue for each process
for pid in range(processes):
queues.append( multiprocessing.Queue() )
jobs=[]
prev_pipe = 0
for pid in range(processes):
p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) )
p.start()
jobs.append(p)
jobs[0].join() #wait for counting to finish
lines = queues[0].get()
logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) )
我修改了缓冲区的情况如下:
def CountLines(filename):
f = open(filename)
try:
lines = 1
buf_size = 1024 * 1024
read_f = f.read # loop optimization
buf = read_f(buf_size)
# Empty file
if not buf:
return 0
while buf:
lines += buf.count('\n')
buf = read_f(buf_size)
return lines
finally:
f.close()
现在空文件和最后一行(不带\n)也被计算在内。
我得到了一个小(4-8%)的改进,这个版本重用了一个常量缓冲区,所以它应该避免任何内存或GC开销:
lines = 0
buffer = bytearray(2048)
with open(filename) as f:
while f.readinto(buffer) > 0:
lines += buffer.count('\n')
您可以调整缓冲区大小,可能会看到一些改进。
下面这句话怎么样:
file_length = len(open('myfile.txt','r').read().split('\n'))
用这种方法在一个3900行的文件上计时只需要0.003秒
def c():
import time
s = time.time()
file_length = len(open('myfile.txt','r').read().split('\n'))
print time.time() - s
我会使用Python的文件对象方法readlines,如下所示:
with open(input_file) as foo:
lines = len(foo.readlines())
这将打开文件,在文件中创建一个行列表,计算列表的长度,将其保存到一个变量中,然后再次关闭文件。
def line_count(path):
count = 0
with open(path) as lines:
for count, l in enumerate(lines, start=1):
pass
return count
如果你想在Linux下的Python中廉价地获取行数,我推荐这个方法:
import os
print os.popen("wc -l file_path").readline().split()[0]
File_path可以是抽象文件路径,也可以是相对路径。希望这能有所帮助。
凯尔的回答
num_lines = sum(1 for line in open('my_file.txt'))
最好的替代方案是什么
num_lines = len(open('my_file.txt').read().splitlines())
这里是两者的性能比较
In [20]: timeit sum(1 for line in open('Charts.ipynb'))
100000 loops, best of 3: 9.79 µs per loop
In [21]: timeit len(open('Charts.ipynb').read().splitlines())
100000 loops, best of 3: 12 µs per loop
你可以使用操作系统。路径模块如下所示:
import os
import subprocess
Number_lines = int( (subprocess.Popen( 'wc -l {0}'.format( Filename ), shell=True, stdout=subprocess.PIPE).stdout).readlines()[0].split()[0] )
,其中Filename是文件的绝对路径。
我不得不在类似的问题上发表这篇文章,直到我的声誉分数上升了一点(感谢那些撞了我的人!)。
所有这些解决方案都忽略了一种使其运行得更快的方法,即使用无缓冲(原始)接口,使用字节数组,并进行自己的缓冲。(这只适用于Python 3。在Python 2中,原始接口在默认情况下可以使用,也可以不使用,但在Python 3中,您将默认使用Unicode。)
使用一个修改版本的计时工具,我相信下面的代码比任何提供的解决方案都更快(并且稍微更python化):
def rawcount(filename):
f = open(filename, 'rb')
lines = 0
buf_size = 1024 * 1024
read_f = f.raw.read
buf = read_f(buf_size)
while buf:
lines += buf.count(b'\n')
buf = read_f(buf_size)
return lines
使用单独的生成器函数,运行速度会快一点:
def _make_gen(reader):
b = reader(1024 * 1024)
while b:
yield b
b = reader(1024*1024)
def rawgencount(filename):
f = open(filename, 'rb')
f_gen = _make_gen(f.raw.read)
return sum( buf.count(b'\n') for buf in f_gen )
这完全可以用itertools内嵌的生成器表达式来完成,但它看起来非常奇怪:
from itertools import (takewhile,repeat)
def rawincount(filename):
f = open(filename, 'rb')
bufgen = takewhile(lambda x: x, (f.raw.read(1024*1024) for _ in repeat(None)))
return sum( buf.count(b'\n') for buf in bufgen )
以下是我的时间安排:
function average, s min, s ratio
rawincount 0.0043 0.0041 1.00
rawgencount 0.0044 0.0042 1.01
rawcount 0.0048 0.0045 1.09
bufcount 0.008 0.0068 1.64
wccount 0.01 0.0097 2.35
itercount 0.014 0.014 3.41
opcount 0.02 0.02 4.83
kylecount 0.021 0.021 5.05
simplecount 0.022 0.022 5.25
mapcount 0.037 0.031 7.46
另一种可能性:
import subprocess
def num_lines_in_file(fpath):
return int(subprocess.check_output('wc -l %s' % fpath, shell=True).strip().split()[0])
这是我用纯python发现的最快的东西。 你可以通过设置buffer来使用任意大小的内存,不过在我的电脑上2**16似乎是一个最佳位置。
from functools import partial
buffer=2**16
with open(myfile) as f:
print sum(x.count('\n') for x in iter(partial(f.read,buffer), ''))
我在这里找到了答案为什么在c++中从stdin读取行要比Python慢得多?稍微调整了一下。这是一个非常好的阅读来理解如何快速计数行,尽管wc -l仍然比其他任何方法快75%。
一句话解决方案:
import os
os.system("wc -l filename")
我的代码片段:
>>> os.system('wc -l *.txt')
0 bar.txt
1000 command.txt
3 test_file.txt
1003 total
与此答案类似的一行bash解决方案,使用了现代子进程。check_output功能:
def line_count(filename):
return int(subprocess.check_output(['wc', '-l', filename]).split()[0])
这是我用的,看起来很干净:
import subprocess
def count_file_lines(file_path):
"""
Counts the number of lines in a file using wc utility.
:param file_path: path to file
:return: int, no of lines
"""
num = subprocess.check_output(['wc', '-l', file_path])
num = num.split(' ')
return int(num[0])
更新:这比使用纯python略快,但以内存使用为代价。子进程在执行您的命令时将派生一个与父进程具有相同内存占用的新进程。
def count_text_file_lines(path):
with open(path, 'rt') as file:
line_count = sum(1 for _line in file)
return line_count
创建一个可执行脚本文件count.py:
#!/usr/bin/python
import sys
count = 0
for line in sys.stdin:
count+=1
然后将文件的内容导入python脚本:cat huge.txt | ./count.py。管道也适用于Powershell,因此您将最终计算行数。
对我来说,在Linux上它比简单的解决方案快30%:
count=1
with open('huge.txt') as f:
count+=1
如果你的文件中的所有行都是相同的长度(并且只包含ASCII字符)*,你可以非常便宜地执行以下操作:
fileSize = os.path.getsize( pathToFile ) # file size in bytes
bytesPerLine = someInteger # don't forget to account for the newline character
numLines = fileSize // bytesPerLine
*如果使用像é这样的unicode字符,我怀疑需要更多的努力来确定一行中的字节数。
简单的方法:
1)
>>> f = len(open("myfile.txt").readlines())
>>> f
430
>>> f = open("myfile.txt").read().count('\n')
>>> f
430
>>>
num_lines = len(list(open('myfile.txt')))
大文件的另一种选择是使用xreadlines():
count = 0
for line in open(thefilepath).xreadlines( ): count += 1
对于Python 3,请参阅:在Python 3中什么替代xreadlines() ?
使用Numba
我们可以使用Numba来JIT(及时)编译我们的函数到机器代码。Def numbacountparallel(fname)运行速度快2.8倍 然后从问题中定义file_len(fname)。
注:
在运行基准测试之前,操作系统已经将文件缓存到内存中,因为我在我的PC上没有看到太多的磁盘活动。 第一次读取文件时,时间会慢得多,因此使用Numba的时间优势并不显著。
第一次调用函数时,JIT编译需要额外的时间。
如果我们不只是计算行数,这个就很有用了。
Cython是另一个选择。
http://numba.pydata.org/
结论
因为计算行数是IO绑定的,所以使用问题中的def file_len(fname),除非你想做的不仅仅是计算行数。
import timeit
from numba import jit, prange
import numpy as np
from itertools import (takewhile,repeat)
FILE = '../data/us_confirmed.csv' # 40.6MB, 371755 line file
CR = ord('\n')
# Copied from the question above. Used as a benchmark
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
# Copied from another answer. Used as a benchmark
def rawincount(filename):
f = open(filename, 'rb')
bufgen = takewhile(lambda x: x, (f.read(1024*1024*10) for _ in repeat(None)))
return sum( buf.count(b'\n') for buf in bufgen )
# Single thread
@jit(nopython=True)
def numbacountsingle_chunk(bs):
c = 0
for i in range(len(bs)):
if bs[i] == CR:
c += 1
return c
def numbacountsingle(filename):
f = open(filename, "rb")
total = 0
while True:
chunk = f.read(1024*1024*10)
lines = numbacountsingle_chunk(chunk)
total += lines
if not chunk:
break
return total
# Multi thread
@jit(nopython=True, parallel=True)
def numbacountparallel_chunk(bs):
c = 0
for i in prange(len(bs)):
if bs[i] == CR:
c += 1
return c
def numbacountparallel(filename):
f = open(filename, "rb")
total = 0
while True:
chunk = f.read(1024*1024*10)
lines = numbacountparallel_chunk(np.frombuffer(chunk, dtype=np.uint8))
total += lines
if not chunk:
break
return total
print('numbacountparallel')
print(numbacountparallel(FILE)) # This allows Numba to compile and cache the function without adding to the time.
print(timeit.Timer(lambda: numbacountparallel(FILE)).timeit(number=100))
print('\nnumbacountsingle')
print(numbacountsingle(FILE))
print(timeit.Timer(lambda: numbacountsingle(FILE)).timeit(number=100))
print('\nfile_len')
print(file_len(FILE))
print(timeit.Timer(lambda: rawincount(FILE)).timeit(number=100))
print('\nrawincount')
print(rawincount(FILE))
print(timeit.Timer(lambda: rawincount(FILE)).timeit(number=100))
每个函数调用100次的时间(以秒为单位)
numbacountparallel
371755
2.8007332000000003
numbacountsingle
371755
3.1508585999999994
file_len
371755
6.7945494
rawincount
371755
6.815438
这是对其他一些答案的元评论。
The line-reading and buffered \n-counting techniques won't return the same answer for every file, because some text files have no newline at the end of the last line. You can work around this by checking the last byte of the last nonempty buffer and adding 1 if it's not b'\n'. In Python 3, opening the file in text mode and in binary mode can yield different results, because text mode by default recognizes CR, LF, and CRLF as line endings (converting them all to '\n'), while in binary mode only LF and CRLF will be counted if you count b'\n'. This applies whether you read by lines or into a fixed-size buffer. The classic Mac OS used CR as a line ending; I don't know how common those files are these days. The buffer-reading approach uses a bounded amount of RAM independent of file size, while the line-reading approach could read the entire file into RAM at once in the worst case (especially if the file uses CR line endings). In the worst case it may use substantially more RAM than the file size, because of overhead from dynamic resizing of the line buffer and (if you opened in text mode) Unicode decoding and storage. You can improve the memory usage, and probably the speed, of the buffered approach by pre-allocating a bytearray and using readinto instead of read. One of the existing answers (with few votes) does this, but it's buggy (it double-counts some bytes). The top buffer-reading answer uses a large buffer (1 MiB). Using a smaller buffer can actually be faster because of OS readahead. If you read 32K or 64K at a time, the OS will probably start reading the next 32K/64K into the cache before you ask for it, and each trip to the kernel will return almost immediately. If you read 1 MiB at a time, the OS is unlikely to speculatively read a whole megabyte. It may preread a smaller amount but you will still spend a significant amount of time sitting in the kernel waiting for the disk to return the rest of the data.
已经有很多答案了,但不幸的是,它们中的大多数只是一个几乎不可优化的问题上的微型经济……
在我参与的几个项目中,行数是软件的核心功能,以最快的速度处理大量文件是至关重要的。
行数的主要瓶颈是I/O访问,因为您需要读取每一行以检测行返回字符,因此没有其他方法。第二个潜在的瓶颈是内存管理:一次加载的内存越多,处理的速度就越快,但与第一个瓶颈相比,这个瓶颈可以忽略不计。
因此,除了禁用gc收集和其他微管理技巧等微小优化外,还有3种主要方法可以减少行计数函数的处理时间:
Hardware solution: the major and most obvious way is non-programmatic: buy a very fast SSD/flash hard drive. By far, this is how you can get the biggest speed boosts. Data preparation solution: if you generate or can modify how the files you process are generated, or if it's acceptable that you can pre-process them, first convert the line return to unix style (\n) as this will save 1 character compared to Windows or MacOS styles (not a big save but it's an easy gain), and secondly and most importantly, you can potentially write lines of fixed length. If you need variable length, you can always pad smaller lines. This way, you can calculate instantly the number of lines from the total filesize, which is much faster to access. Often, the best solution to a problem is to pre-process it so that it better fits your end purpose. Parallelization + hardware solution: if you can buy multiple hard disks (and if possible SSD flash disks), then you can even go beyond the speed of one disk by leveraging parallelization, by storing your files in a balanced way (easiest is to balance by total size) among disks, and then read in parallel from all those disks. Then, you can expect to get a multiplier boost in proportion with the number of disks you have. If buying multiple disks is not an option for you, then parallelization likely won't help (except if your disk has multiple reading headers like some professional-grade disks, but even then the disk's internal cache memory and PCB circuitry will likely be a bottleneck and prevent you from fully using all heads in parallel, plus you have to devise a specific code for this hard drive you'll use because you need to know the exact cluster mapping so that you store your files on clusters under different heads, and so that you can read them with different heads after). Indeed, it's commonly known that sequential reading is almost always faster than random reading, and parallelization on a single disk will have a performance more similar to random reading than sequential reading (you can test your hard drive speed in both aspects using CrystalDiskMark for example).
如果这些都不是选择,那么你只能依靠微观管理技巧来提高行数函数的速度,但不要指望有什么真正重要的东西。相反,您可以预期,与您将看到的速度改进回报相比,您花费在调整上的时间将是不均衡的。
在perfplot分析之后,必须推荐缓冲读取解决方案
def buf_count_newlines_gen(fname):
def _make_gen(reader):
while True:
b = reader(2 ** 16)
if not b: break
yield b
with open(fname, "rb") as f:
count = sum(buf.count(b"\n") for buf in _make_gen(f.raw.read))
return count
它速度快,内存效率高。大多数其他解决方案大约要慢20倍。
代码重现情节:
import mmap
import subprocess
from functools import partial
import perfplot
def setup(n):
fname = "t.txt"
with open(fname, "w") as f:
for i in range(n):
f.write(str(i) + "\n")
return fname
def for_enumerate(fname):
i = 0
with open(fname) as f:
for i, _ in enumerate(f):
pass
return i + 1
def sum1(fname):
return sum(1 for _ in open(fname))
def mmap_count(fname):
with open(fname, "r+") as f:
buf = mmap.mmap(f.fileno(), 0)
lines = 0
while buf.readline():
lines += 1
return lines
def for_open(fname):
lines = 0
for _ in open(fname):
lines += 1
return lines
def buf_count_newlines(fname):
lines = 0
buf_size = 2 ** 16
with open(fname) as f:
buf = f.read(buf_size)
while buf:
lines += buf.count("\n")
buf = f.read(buf_size)
return lines
def buf_count_newlines_gen(fname):
def _make_gen(reader):
b = reader(2 ** 16)
while b:
yield b
b = reader(2 ** 16)
with open(fname, "rb") as f:
count = sum(buf.count(b"\n") for buf in _make_gen(f.raw.read))
return count
def wc_l(fname):
return int(subprocess.check_output(["wc", "-l", fname]).split()[0])
def sum_partial(fname):
with open(fname) as f:
count = sum(x.count("\n") for x in iter(partial(f.read, 2 ** 16), ""))
return count
def read_count(fname):
return open(fname).read().count("\n")
b = perfplot.bench(
setup=setup,
kernels=[
for_enumerate,
sum1,
mmap_count,
for_open,
wc_l,
buf_count_newlines,
buf_count_newlines_gen,
sum_partial,
read_count,
],
n_range=[2 ** k for k in range(27)],
xlabel="num lines",
)
b.save("out.png")
b.show()