这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?


当前回答

from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

以上在我的机器上工作得很漂亮(Ubuntu,包joblib是预安装的,但可以通过pip install joblib安装)。

摘自https://blog.dominodatalab.com/simple-parallelization/


编辑于2021年3月31日:关于joblib, multiprocessing, threading和asyncio

joblib in the above code uses import multiprocessing under the hood (and thus multiple processes, which is typically the best way to run CPU work across cores - because of the GIL) You can let joblib use multiple threads instead of multiple processes, but this (or using import threading directly) is only beneficial if the threads spend considerable time on I/O (e.g. read/write to disk, send an HTTP request). For I/O work, the GIL does not block the execution of another thread Since Python 3.7, as an alternative to threading, you can parallelise work with asyncio, but the same advice applies like for import threading (though in contrast to latter, only 1 thread will be used; on the plus side, asyncio has a lot of nice features which are helpful for async programming) Using multiple processes incurs overhead. Think about it: Typically, each process needs to initialise/load everything you need to run your calculation. You need to check yourself if the above code snippet improves your wall time. Here is another one, for which I confirmed that joblib produces better results:

import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

其他回答

由于全局解释器锁(GIL)的存在,在CPython上使用多线程并不能为纯python代码提供更好的性能。我建议使用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

注意,这在交互式解释器中不起作用。

为了避免GIL周围常见的FUD:在本例中使用线程没有任何优势。这里要使用进程,而不是线程,因为它们避免了一大堆问题。

tqdm库的并发包装器是并行化长时间运行代码的好方法。tqdm通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。

通过对thread_map的简单调用,循环可以被重写为并发线程,或者通过对process_map的简单调用,循环可以被重写为并发多进程:

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)

Dask期货;我很惊讶至今还没有人提起这件事……

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()

我发现joblib对我很有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

N_jobs =-1:使用所有可用内核

这在用Python实现多处理和并行/分布式计算时非常有用。

YouTube教程使用techila包

Techila是一种分布式计算中间件,它使用Techila包直接与Python集成。包中的peach函数在并行化循环结构时非常有用。(以下代码片段来自Techila社区论坛)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )