这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。
多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。
并行化这段代码最简单的方法是什么?
这在用Python实现多处理和并行/分布式计算时非常有用。
YouTube教程使用techila包
Techila是一种分布式计算中间件,它使用Techila包直接与Python集成。包中的peach函数在并行化循环结构时非常有用。(以下代码片段来自Techila社区论坛)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
使用Ray有很多优点:
除了多个核(使用相同的代码)之外,还可以在多台机器上并行。
通过共享内存(和零拷贝序列化)有效地处理数值数据。
具有分布式调度的高任务吞吐量。
容错。
在本例中,您可以启动Ray并定义一个远程函数
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
然后并行地调用它
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
要在集群上运行相同的示例,唯一需要更改的行是对ray.init()的调用。相关文档可以在这里找到。
请注意,我正在帮助开发雷。
tqdm库的并发包装器是并行化长时间运行代码的好方法。tqdm通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。
通过对thread_map的简单调用,循环可以被重写为并发线程,或者通过对process_map的简单调用,循环可以被重写为并发多进程:
from tqdm.contrib.concurrent import thread_map, process_map
def calc_stuff(num, multiplier):
import time
time.sleep(1)
return num, num * multiplier
if __name__ == "__main__":
# let's parallelize this for loop:
# results = [calc_stuff(i, 2) for i in range(64)]
loop_idx = range(64)
multiplier = [2] * len(loop_idx)
# either with threading:
results_threading = thread_map(calc_stuff, loop_idx, multiplier)
# or with multi-processing:
results_processes = process_map(calc_stuff, loop_idx, multiplier)
由于全局解释器锁(GIL)的存在,在CPython上使用多线程并不能为纯python代码提供更好的性能。我建议使用multiprocessing模块:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
注意,这在交互式解释器中不起作用。
为了避免GIL周围常见的FUD:在本例中使用线程没有任何优势。这里要使用进程,而不是线程,因为它们避免了一大堆问题。