这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。
多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。
并行化这段代码最简单的方法是什么?
假设我们有一个async函数
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
这需要在一个大数组上运行。有些属性被传递给程序,有些则来自数组中字典元素的属性。
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))
由于全局解释器锁(GIL)的存在,在CPython上使用多线程并不能为纯python代码提供更好的性能。我建议使用multiprocessing模块:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
注意,这在交互式解释器中不起作用。
为了避免GIL周围常见的FUD:在本例中使用线程没有任何优势。这里要使用进程,而不是线程,因为它们避免了一大堆问题。
tqdm库的并发包装器是并行化长时间运行代码的好方法。tqdm通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。
通过对thread_map的简单调用,循环可以被重写为并发线程,或者通过对process_map的简单调用,循环可以被重写为并发多进程:
from tqdm.contrib.concurrent import thread_map, process_map
def calc_stuff(num, multiplier):
import time
time.sleep(1)
return num, num * multiplier
if __name__ == "__main__":
# let's parallelize this for loop:
# results = [calc_stuff(i, 2) for i in range(64)]
loop_idx = range(64)
multiplier = [2] * len(loop_idx)
# either with threading:
results_threading = thread_map(calc_stuff, loop_idx, multiplier)
# or with multi-processing:
results_processes = process_map(calc_stuff, loop_idx, multiplier)