如何以最有效的内存和时间方式获取大文件的行数?

def file_len(filename):
    with open(filename) as f:
        for i, _ in enumerate(f):
            pass
    return i + 1

当前回答

我不得不在类似的问题上发表这篇文章,直到我的声誉分数上升了一点(感谢那些撞了我的人!)。

所有这些解决方案都忽略了一种使其运行得更快的方法,即使用无缓冲(原始)接口,使用字节数组,并进行自己的缓冲。(这只适用于Python 3。在Python 2中,原始接口在默认情况下可以使用,也可以不使用,但在Python 3中,您将默认使用Unicode。)

使用一个修改版本的计时工具,我相信下面的代码比任何提供的解决方案都更快(并且稍微更python化):

def rawcount(filename):
    f = open(filename, 'rb')
    lines = 0
    buf_size = 1024 * 1024
    read_f = f.raw.read

    buf = read_f(buf_size)
    while buf:
        lines += buf.count(b'\n')
        buf = read_f(buf_size)

    return lines

使用单独的生成器函数,运行速度会快一点:

def _make_gen(reader):
    b = reader(1024 * 1024)
    while b:
        yield b
        b = reader(1024*1024)

def rawgencount(filename):
    f = open(filename, 'rb')
    f_gen = _make_gen(f.raw.read)
    return sum( buf.count(b'\n') for buf in f_gen )

这完全可以用itertools内嵌的生成器表达式来完成,但它看起来非常奇怪:

from itertools import (takewhile,repeat)

def rawincount(filename):
    f = open(filename, 'rb')
    bufgen = takewhile(lambda x: x, (f.raw.read(1024*1024) for _ in repeat(None)))
    return sum( buf.count(b'\n') for buf in bufgen )

以下是我的时间安排:

function      average, s  min, s   ratio
rawincount        0.0043  0.0041   1.00
rawgencount       0.0044  0.0042   1.01
rawcount          0.0048  0.0045   1.09
bufcount          0.008   0.0068   1.64
wccount           0.01    0.0097   2.35
itercount         0.014   0.014    3.41
opcount           0.02    0.02     4.83
kylecount         0.021   0.021    5.05
simplecount       0.022   0.022    5.25
mapcount          0.037   0.031    7.46

其他回答

这是我用纯python发现的最快的东西。 你可以通过设置buffer来使用任意大小的内存,不过在我的电脑上2**16似乎是一个最佳位置。

from functools import partial

buffer=2**16
with open(myfile) as f:
        print sum(x.count('\n') for x in iter(partial(f.read,buffer), ''))

我在这里找到了答案为什么在c++中从stdin读取行要比Python慢得多?稍微调整了一下。这是一个非常好的阅读来理解如何快速计数行,尽管wc -l仍然比其他任何方法快75%。

这段代码更短、更清晰。这可能是最好的方法:

num_lines = open('yourfile.ext').read().count('\n')

下面是一个python程序,使用多处理库将行计数分布到不同的机器/核。使用8核windows 64服务器,我的测试将一个2000万行文件的计数从26秒提高到7秒。注意:不使用内存映射会使运行速度变慢。

import multiprocessing, sys, time, os, mmap
import logging, logging.handlers

def init_logger(pid):
    console_format = 'P{0} %(levelname)s %(message)s'.format(pid)
    logger = logging.getLogger()  # New logger at root level
    logger.setLevel( logging.INFO )
    logger.handlers.append( logging.StreamHandler() )
    logger.handlers[0].setFormatter( logging.Formatter( console_format, '%d/%m/%y %H:%M:%S' ) )

def getFileLineCount( queues, pid, processes, file1 ):
    init_logger(pid)
    logging.info( 'start' )

    physical_file = open(file1, "r")
    #  mmap.mmap(fileno, length[, tagname[, access[, offset]]]

    m1 = mmap.mmap( physical_file.fileno(), 0, access=mmap.ACCESS_READ )

    #work out file size to divide up line counting

    fSize = os.stat(file1).st_size
    chunk = (fSize / processes) + 1

    lines = 0

    #get where I start and stop
    _seedStart = chunk * (pid)
    _seekEnd = chunk * (pid+1)
    seekStart = int(_seedStart)
    seekEnd = int(_seekEnd)

    if seekEnd < int(_seekEnd + 1):
        seekEnd += 1

    if _seedStart < int(seekStart + 1):
        seekStart += 1

    if seekEnd > fSize:
        seekEnd = fSize

    #find where to start
    if pid > 0:
        m1.seek( seekStart )
        #read next line
        l1 = m1.readline()  # need to use readline with memory mapped files
        seekStart = m1.tell()

    #tell previous rank my seek start to make their seek end

    if pid > 0:
        queues[pid-1].put( seekStart )
    if pid < processes-1:
        seekEnd = queues[pid].get()

    m1.seek( seekStart )
    l1 = m1.readline()

    while len(l1) > 0:
        lines += 1
        l1 = m1.readline()
        if m1.tell() > seekEnd or len(l1) == 0:
            break

    logging.info( 'done' )
    # add up the results
    if pid == 0:
        for p in range(1,processes):
            lines += queues[0].get()
        queues[0].put(lines) # the total lines counted
    else:
        queues[0].put(lines)

    m1.close()
    physical_file.close()

if __name__ == '__main__':
    init_logger( 'main' )
    if len(sys.argv) > 1:
        file_name = sys.argv[1]
    else:
        logging.fatal( 'parameters required: file-name [processes]' )
        exit()

    t = time.time()
    processes = multiprocessing.cpu_count()
    if len(sys.argv) > 2:
        processes = int(sys.argv[2])
    queues=[] # a queue for each process
    for pid in range(processes):
        queues.append( multiprocessing.Queue() )
    jobs=[]
    prev_pipe = 0
    for pid in range(processes):
        p = multiprocessing.Process( target = getFileLineCount, args=(queues, pid, processes, file_name,) )
        p.start()
        jobs.append(p)

    jobs[0].join() #wait for counting to finish
    lines = queues[0].get()

    logging.info( 'finished {} Lines:{}'.format( time.time() - t, lines ) )

您可以执行子进程并运行wc -l filename

import subprocess

def file_len(fname):
    p = subprocess.Popen(['wc', '-l', fname], stdout=subprocess.PIPE, 
                                              stderr=subprocess.PIPE)
    result, err = p.communicate()
    if p.returncode != 0:
        raise IOError(err)
    return int(result.strip().split()[0])

已经有很多答案了,但不幸的是,它们中的大多数只是一个几乎不可优化的问题上的微型经济……

在我参与的几个项目中,行数是软件的核心功能,以最快的速度处理大量文件是至关重要的。

行数的主要瓶颈是I/O访问,因为您需要读取每一行以检测行返回字符,因此没有其他方法。第二个潜在的瓶颈是内存管理:一次加载的内存越多,处理的速度就越快,但与第一个瓶颈相比,这个瓶颈可以忽略不计。

因此,除了禁用gc收集和其他微管理技巧等微小优化外,还有3种主要方法可以减少行计数函数的处理时间:

Hardware solution: the major and most obvious way is non-programmatic: buy a very fast SSD/flash hard drive. By far, this is how you can get the biggest speed boosts. Data preparation solution: if you generate or can modify how the files you process are generated, or if it's acceptable that you can pre-process them, first convert the line return to unix style (\n) as this will save 1 character compared to Windows or MacOS styles (not a big save but it's an easy gain), and secondly and most importantly, you can potentially write lines of fixed length. If you need variable length, you can always pad smaller lines. This way, you can calculate instantly the number of lines from the total filesize, which is much faster to access. Often, the best solution to a problem is to pre-process it so that it better fits your end purpose. Parallelization + hardware solution: if you can buy multiple hard disks (and if possible SSD flash disks), then you can even go beyond the speed of one disk by leveraging parallelization, by storing your files in a balanced way (easiest is to balance by total size) among disks, and then read in parallel from all those disks. Then, you can expect to get a multiplier boost in proportion with the number of disks you have. If buying multiple disks is not an option for you, then parallelization likely won't help (except if your disk has multiple reading headers like some professional-grade disks, but even then the disk's internal cache memory and PCB circuitry will likely be a bottleneck and prevent you from fully using all heads in parallel, plus you have to devise a specific code for this hard drive you'll use because you need to know the exact cluster mapping so that you store your files on clusters under different heads, and so that you can read them with different heads after). Indeed, it's commonly known that sequential reading is almost always faster than random reading, and parallelization on a single disk will have a performance more similar to random reading than sequential reading (you can test your hard drive speed in both aspects using CrystalDiskMark for example).

如果这些都不是选择,那么你只能依靠微观管理技巧来提高行数函数的速度,但不要指望有什么真正重要的东西。相反,您可以预期,与您将看到的速度改进回报相比,您花费在调整上的时间将是不均衡的。