这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?


当前回答

from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

以上在我的机器上工作得很漂亮(Ubuntu,包joblib是预安装的,但可以通过pip install joblib安装)。

摘自https://blog.dominodatalab.com/simple-parallelization/


编辑于2021年3月31日:关于joblib, multiprocessing, threading和asyncio

joblib in the above code uses import multiprocessing under the hood (and thus multiple processes, which is typically the best way to run CPU work across cores - because of the GIL) You can let joblib use multiple threads instead of multiple processes, but this (or using import threading directly) is only beneficial if the threads spend considerable time on I/O (e.g. read/write to disk, send an HTTP request). For I/O work, the GIL does not block the execution of another thread Since Python 3.7, as an alternative to threading, you can parallelise work with asyncio, but the same advice applies like for import threading (though in contrast to latter, only 1 thread will be used; on the plus side, asyncio has a lot of nice features which are helpful for async programming) Using multiple processes incurs overhead. Think about it: Typically, each process needs to initialise/load everything you need to run your calculation. You need to check yourself if the above code snippet improves your wall time. Here is another one, for which I confirmed that joblib produces better results:

import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

其他回答

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅是简短的语法,还包括在迭代非常快的时候(以消除开销)进行透明的迭代聚集,或者捕获子进程的回溯,以获得更好的错误报告。

免责声明:我是joblib的原作者。

tqdm库的并发包装器是并行化长时间运行代码的好方法。tqdm通过智能进度表提供当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。

通过对thread_map的简单调用,循环可以被重写为并发线程,或者通过对process_map的简单调用,循环可以被重写为并发多进程:

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)
from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

以上在我的机器上工作得很漂亮(Ubuntu,包joblib是预安装的,但可以通过pip install joblib安装)。

摘自https://blog.dominodatalab.com/simple-parallelization/


编辑于2021年3月31日:关于joblib, multiprocessing, threading和asyncio

joblib in the above code uses import multiprocessing under the hood (and thus multiple processes, which is typically the best way to run CPU work across cores - because of the GIL) You can let joblib use multiple threads instead of multiple processes, but this (or using import threading directly) is only beneficial if the threads spend considerable time on I/O (e.g. read/write to disk, send an HTTP request). For I/O work, the GIL does not block the execution of another thread Since Python 3.7, as an alternative to threading, you can parallelise work with asyncio, but the same advice applies like for import threading (though in contrast to latter, only 1 thread will be used; on the plus side, asyncio has a lot of nice features which are helpful for async programming) Using multiple processes incurs overhead. Think about it: Typically, each process needs to initialise/load everything you need to run your calculation. You need to check yourself if the above code snippet improves your wall time. Here is another one, for which I confirmed that joblib produces better results:

import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro

假设我们有一个async函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

这需要在一个大数组上运行。有些属性被传递给程序,有些则来自数组中字典元素的属性。

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

由于@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'