我想逐行读取一个大文件(>5GB),而不将其全部内容加载到内存中。我不能使用readlines(),因为它在内存中创建了一个非常大的列表。
当前回答
我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!
加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。
希望对大家有用:
def readJSON_line2(linesIn):
#Function for reading a chunk of json lines
'''
Note, this function is nonsensical. A user would never use the approach suggested
for reading in a JSON file,
its role is to evaluate the MT approach for full line by line processing to both
increase speed and reduce memory overhead
'''
import json
linesRtn = []
for lineIn in linesIn:
if lineIn.strip() != 0:
lineRtn = json.loads(lineIn)
else:
lineRtn = ""
linesRtn.append(lineRtn)
return linesRtn
# -------------------------------------------------------------------
if __name__ == "__main__":
import multiprocessing as mp
path1 = "C:\\user\\Documents\\"
file1 = "someBigJson.json"
nBuffer = 20*nCPUs # How many chunks are queued up (so cpus aren't waiting on processes spawning)
nChunk = 1000 # How many lines are in each chunk
#Both of the above will require balancing speed against memory overhead
iJob = 0 #Tracker for SMP jobs submitted into pool
iiJob = 0 #Tracker for SMP jobs extracted back out of pool
jobs = [] #SMP job holder
MTres3 = [] #Final result holder
chunk = []
iBuffer = 0 # Buffer line count
with open(path1+file1) as f:
for line in f:
#Send to the chunk
if len(chunk) < nChunk:
chunk.append(line)
else:
#Chunk full
#Don't forget to add the current line to chunk
chunk.append(line)
#Then add the chunk to the buffer (submit to SMP pool)
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#Clear the chunk for the next batch of entries
chunk = []
#Buffer is full, any more chunks submitted would cause undue memory overhead
#(Partially) empty the buffer
if iBuffer >= nBuffer:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iBuffer -=1
iiJob+=1
#Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
if chunk:
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#And gather up the last of the buffer, including the final chunk
while iiJob < iJob:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iiJob+=1
#Cleanup
del chunk, jobs, temp1
pool.close()
其他回答
我意识到这个问题在很久以前就已经回答过了,但是这里有一种并行的方法,而不会杀死您的内存开销(如果您试图将每一行放入池中,就会出现这种情况)。显然,将readJSON_line2函数替换为一些合理的函数——这只是为了说明这一点!
加速将取决于文件大小和你对每一行所做的事情-但最坏的情况是,对于一个小文件,只是用JSON阅读器读取它,我看到下面设置的性能与ST相似。
希望对大家有用:
def readJSON_line2(linesIn):
#Function for reading a chunk of json lines
'''
Note, this function is nonsensical. A user would never use the approach suggested
for reading in a JSON file,
its role is to evaluate the MT approach for full line by line processing to both
increase speed and reduce memory overhead
'''
import json
linesRtn = []
for lineIn in linesIn:
if lineIn.strip() != 0:
lineRtn = json.loads(lineIn)
else:
lineRtn = ""
linesRtn.append(lineRtn)
return linesRtn
# -------------------------------------------------------------------
if __name__ == "__main__":
import multiprocessing as mp
path1 = "C:\\user\\Documents\\"
file1 = "someBigJson.json"
nBuffer = 20*nCPUs # How many chunks are queued up (so cpus aren't waiting on processes spawning)
nChunk = 1000 # How many lines are in each chunk
#Both of the above will require balancing speed against memory overhead
iJob = 0 #Tracker for SMP jobs submitted into pool
iiJob = 0 #Tracker for SMP jobs extracted back out of pool
jobs = [] #SMP job holder
MTres3 = [] #Final result holder
chunk = []
iBuffer = 0 # Buffer line count
with open(path1+file1) as f:
for line in f:
#Send to the chunk
if len(chunk) < nChunk:
chunk.append(line)
else:
#Chunk full
#Don't forget to add the current line to chunk
chunk.append(line)
#Then add the chunk to the buffer (submit to SMP pool)
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#Clear the chunk for the next batch of entries
chunk = []
#Buffer is full, any more chunks submitted would cause undue memory overhead
#(Partially) empty the buffer
if iBuffer >= nBuffer:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iBuffer -=1
iiJob+=1
#Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
if chunk:
jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
iJob +=1
iBuffer +=1
#And gather up the last of the buffer, including the final chunk
while iiJob < iJob:
temp1 = jobs[iiJob].get()
for rtnLine1 in temp1:
MTres3.append(rtnLine1)
iiJob+=1
#Cleanup
del chunk, jobs, temp1
pool.close()
这个怎么样? 将文件划分为块,然后逐行读取,因为当您读取文件时,操作系统将缓存下一行。如果逐行读取文件,则不能有效利用缓存的信息。
相反,将文件划分为块,并将整个块加载到内存中,然后进行处理。
def chunks(file,size=1024):
while 1:
startat=fh.tell()
print startat #file's object current position from the start
fh.seek(size,1) #offset from current postion -->1
data=fh.readline()
yield startat,fh.tell()-startat #doesnt store whole list in memory
if not data:
break
if os.path.isfile(fname):
try:
fh=open(fname,'rb')
except IOError as e: #file --> permission denied
print "I/O error({0}): {1}".format(e.errno, e.strerror)
except Exception as e1: #handle other exceptions such as attribute errors
print "Unexpected error: {0}".format(e1)
for ele in chunks(fh):
fh.seek(ele[0])#startat
data=fh.read(ele[1])#endat
print data
我不敢相信这能像@john-la-rooy的回答看起来那么简单。因此,我使用逐行读写重新创建了cp命令。这是疯狂的快。
#!/usr/bin/env python3.6
import sys
with open(sys.argv[2], 'w') as outfile:
with open(sys.argv[1]) as infile:
for line in infile:
outfile.write(line)
老派方法:
fh = open(file_name, 'rt')
line = fh.readline()
while line:
# do stuff with line
line = fh.readline()
fh.close()
blaze项目在过去6年里取得了长足的进展。它有一个简单的API,涵盖了pandas功能的一个有用子集。
dask。Dataframe内部负责分块,支持许多可并行操作,并允许您轻松地将切片导出回pandas,以便在内存中操作。
import dask.dataframe as dd
df = dd.read_csv('filename.csv')
df.head(10) # return first 10 rows
df.tail(10) # return last 10 rows
# iterate rows
for idx, row in df.iterrows():
...
# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()
# slice by column
df[df.my_field=='XYZ'].compute()
推荐文章
- 有没有办法在python中做HTTP PUT
- “foo Is None”和“foo == None”之间有什么区别吗?
- 类没有对象成员
- Django模型“没有显式声明app_label”
- 熊猫能自动从CSV文件中读取日期吗?
- 在python中zip的逆函数是什么?
- 有效的方法应用多个过滤器的熊猫数据框架或系列
- 如何检索插入id后插入行在SQLite使用Python?
- 我如何在Django中添加一个CharField占位符?
- 如何在Python中获取当前执行文件的路径?
- 我如何得到“id”后插入到MySQL数据库与Python?
- super()失败,错误:TypeError "参数1必须是类型,而不是classobj"当父不继承对象
- Python内存泄漏
- 实现嵌套字典的最佳方法是什么?
- 如何在tensorflow中获得当前可用的gpu ?