如何以最有效的内存和时间方式获取大文件的行数?
def file_len(filename):
with open(filename) as f:
for i, _ in enumerate(f):
pass
return i + 1
如何以最有效的内存和时间方式获取大文件的行数?
def file_len(filename):
with open(filename) as f:
for i, _ in enumerate(f):
pass
return i + 1
当前回答
下面是一个python程序,使用多处理库将行计数分布到不同的机器/核。使用8核windows 64服务器,我的测试将一个2000万行文件的计数从26秒提高到7秒。注意:不使用内存映射会使运行速度变慢。
import multiprocessing, sys, time, os, mmap
import logging, logging.handlers
def init_logger(pid):
console_format = 'P{0} %(levelname)s %(message)s'.format(pid)
logger = logging.getLogger() # New logger at root level
logger.setLevel( logging.INFO )
logger.handlers.append( logging.StreamHandler() )
logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) )
def getFileLineCount( queues, pid, processes, file1 ):
init_logger(pid)
logging.info( 'start' )
physical_file = open(file1, "r")
# mmap.mmap(fileno, length[, tagname[, access[, offset]]]
m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ )
#work out file size to divide up line counting
fSize = os.stat(file1).st_size
chunk = (fSize / processes) + 1
lines = 0
#get where I start and stop
_seedStart = chunk * (pid)
_seekEnd = chunk * (pid+1)
seekStart = int(_seedStart)
seekEnd = int(_seekEnd)
if seekEnd < int(_seekEnd + 1):
seekEnd += 1
if _seedStart < int(seekStart + 1):
seekStart += 1
if seekEnd > fSize:
seekEnd = fSize
#find where to start
if pid > 0:
m1.seek( seekStart )
#read next line
l1 = m1.readline() # need to use readline with memory mapped files
seekStart = m1.tell()
#tell previous rank my seek start to make their seek end
if pid > 0:
queues[pid-1].put( seekStart )
if pid < processes-1:
seekEnd = queues[pid].get()
m1.seek( seekStart )
l1 = m1.readline()
while len(l1) > 0:
lines += 1
l1 = m1.readline()
if m1.tell() > seekEnd or len(l1) == 0:
break
logging.info( 'done' )
# add up the results
if pid == 0:
for p in range(1,processes):
lines += queues[0].get()
queues[0].put(lines) # the total lines counted
else:
queues[0].put(lines)
m1.close()
physical_file.close()
if __name__ == '__main__':
init_logger( 'main' )
if len(sys.argv) > 1:
file_name = sys.argv[1]
else:
logging.fatal( 'parameters required: file-name [processes]' )
exit()
t = time.time()
processes = multiprocessing.cpu_count()
if len(sys.argv) > 2:
processes = int(sys.argv[2])
queues=[] # a queue for each process
for pid in range(processes):
queues.append( multiprocessing.Queue() )
jobs=[]
prev_pipe = 0
for pid in range(processes):
p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) )
p.start()
jobs.append(p)
jobs[0].join() #wait for counting to finish
lines = queues[0].get()
logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) )
其他回答
我发现你可以。
f = open("data.txt")
linecout = len(f.readlines())
会给你答案吗
这个呢?
import sys
sys.stdin=open('fname','r')
data=sys.stdin.readlines()
print "counted",len(data),"lines"
我不得不在类似的问题上发表这篇文章,直到我的声誉分数上升了一点(感谢那些撞了我的人!)。
所有这些解决方案都忽略了一种使其运行得更快的方法,即使用无缓冲(原始)接口,使用字节数组,并进行自己的缓冲。(这只适用于Python 3。在Python 2中,原始接口在默认情况下可以使用,也可以不使用,但在Python 3中,您将默认使用Unicode。)
使用一个修改版本的计时工具,我相信下面的代码比任何提供的解决方案都更快(并且稍微更python化):
def rawcount(filename):
f = open(filename, 'rb')
lines = 0
buf_size = 1024 * 1024
read_f = f.raw.read
buf = read_f(buf_size)
while buf:
lines += buf.count(b'\n')
buf = read_f(buf_size)
return lines
使用单独的生成器函数,运行速度会快一点:
def _make_gen(reader):
b = reader(1024 * 1024)
while b:
yield b
b = reader(1024*1024)
def rawgencount(filename):
f = open(filename, 'rb')
f_gen = _make_gen(f.raw.read)
return sum( buf.count(b'\n') for buf in f_gen )
这完全可以用itertools内嵌的生成器表达式来完成,但它看起来非常奇怪:
from itertools import (takewhile,repeat)
def rawincount(filename):
f = open(filename, 'rb')
bufgen = takewhile(lambda x: x, (f.raw.read(1024*1024) for _ in repeat(None)))
return sum( buf.count(b'\n') for buf in bufgen )
以下是我的时间安排:
function average, s min, s ratio
rawincount 0.0043 0.0041 1.00
rawgencount 0.0044 0.0042 1.01
rawcount 0.0048 0.0045 1.09
bufcount 0.008 0.0068 1.64
wccount 0.01 0.0097 2.35
itercount 0.014 0.014 3.41
opcount 0.02 0.02 4.83
kylecount 0.021 0.021 5.05
simplecount 0.022 0.022 5.25
mapcount 0.037 0.031 7.46
使用Numba
我们可以使用Numba来JIT(及时)编译我们的函数到机器代码。Def numbacountparallel(fname)运行速度快2.8倍 然后从问题中定义file_len(fname)。
注:
在运行基准测试之前,操作系统已经将文件缓存到内存中,因为我在我的PC上没有看到太多的磁盘活动。 第一次读取文件时,时间会慢得多,因此使用Numba的时间优势并不显著。
第一次调用函数时,JIT编译需要额外的时间。
如果我们不只是计算行数,这个就很有用了。
Cython是另一个选择。
http://numba.pydata.org/
结论
因为计算行数是IO绑定的,所以使用问题中的def file_len(fname),除非你想做的不仅仅是计算行数。
import timeit
from numba import jit, prange
import numpy as np
from itertools import (takewhile,repeat)
FILE = '../data/us_confirmed.csv' # 40.6MB, 371755 line file
CR = ord('\n')
# Copied from the question above. Used as a benchmark
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
# Copied from another answer. Used as a benchmark
def rawincount(filename):
f = open(filename, 'rb')
bufgen = takewhile(lambda x: x, (f.read(1024*1024*10) for _ in repeat(None)))
return sum( buf.count(b'\n') for buf in bufgen )
# Single thread
@jit(nopython=True)
def numbacountsingle_chunk(bs):
c = 0
for i in range(len(bs)):
if bs[i] == CR:
c += 1
return c
def numbacountsingle(filename):
f = open(filename, "rb")
total = 0
while True:
chunk = f.read(1024*1024*10)
lines = numbacountsingle_chunk(chunk)
total += lines
if not chunk:
break
return total
# Multi thread
@jit(nopython=True, parallel=True)
def numbacountparallel_chunk(bs):
c = 0
for i in prange(len(bs)):
if bs[i] == CR:
c += 1
return c
def numbacountparallel(filename):
f = open(filename, "rb")
total = 0
while True:
chunk = f.read(1024*1024*10)
lines = numbacountparallel_chunk(np.frombuffer(chunk, dtype=np.uint8))
total += lines
if not chunk:
break
return total
print('numbacountparallel')
print(numbacountparallel(FILE)) # This allows Numba to compile and cache the function without adding to the time.
print(timeit.Timer(lambda: numbacountparallel(FILE)).timeit(number=100))
print('\nnumbacountsingle')
print(numbacountsingle(FILE))
print(timeit.Timer(lambda: numbacountsingle(FILE)).timeit(number=100))
print('\nfile_len')
print(file_len(FILE))
print(timeit.Timer(lambda: rawincount(FILE)).timeit(number=100))
print('\nrawincount')
print(rawincount(FILE))
print(timeit.Timer(lambda: rawincount(FILE)).timeit(number=100))
每个函数调用100次的时间(以秒为单位)
numbacountparallel
371755
2.8007332000000003
numbacountsingle
371755
3.1508585999999994
file_len
371755
6.7945494
rawincount
371755
6.815438
对我来说,这个变体是最快的:
#!/usr/bin/env python
def main():
f = open('filename')
lines = 0
buf_size = 1024 * 1024
read_f = f.read # loop optimization
buf = read_f(buf_size)
while buf:
lines += buf.count('\n')
buf = read_f(buf_size)
print lines
if __name__ == '__main__':
main()
原因:缓冲比逐行和逐字符串读取快。计数也非常快