这可能是一个微不足道的问题,但我如何在python中并行化下面的循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单个线程,但我不知道如何“收集”结果。

多个进程也可以——在这种情况下,只要是最简单的就行。我目前使用的是Linux,但代码应该在Windows和Mac上运行。

并行化这段代码最简单的方法是什么?


当前回答

并行化这段代码最简单的方法是什么?

使用concurrent.futures中的PoolExecutor。将原始代码与此代码并排比较。首先,最简洁的方法是使用executor.map:

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

或者通过单独提交每个电话来分解:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

离开上下文表示执行程序释放资源

您可以使用线程或进程,并使用完全相同的接口。

一个工作示例

下面是工作示例代码,将演示的价值:

把它放在一个文件futuretest.py中:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

下面是python -m futuretest一次运行的输出:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

处理器密集型的分析

在Python中执行处理器密集型计算时,期望ProcessPoolExecutor比ThreadPoolExecutor性能更好。

由于全局解释器锁(又名GIL)的存在,线程不能使用多个处理器,因此每次计算的时间和壁时间(实际运行的时间)会更大。

IO-bound分析

另一方面,当执行IO绑定操作时,期望ThreadPoolExecutor比ProcessPoolExecutor性能更好。

Python的线程是真实的,OS,线程。操作系统可以让它们进入睡眠状态,并在信息到达时将它们重新唤醒。

最终的想法

我怀疑在Windows上多处理会更慢,因为Windows不支持分叉,所以每个新进程都要花时间启动。

您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来派生多个进程。

如果在Python中面临一个繁重的处理问题,您可以简单地使用额外的进程来扩展—但不能使用线程。

其他回答

由于全局解释器锁(GIL)的存在,在CPython上使用多线程并不能为纯python代码提供更好的性能。我建议使用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

注意,这在交互式解释器中不起作用。

为了避免GIL周围常见的FUD:在本例中使用线程没有任何优势。这里要使用进程,而不是线程,因为它们避免了一大堆问题。

为什么不用线程和一个互斥来保护一个全局列表呢?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,您的速度将与最慢的线程一样快

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅是简短的语法,还包括在迭代非常快的时候(以消除开销)进行透明的迭代聚集,或者捕获子进程的回溯,以获得更好的错误报告。

免责声明:我是joblib的原作者。

这是最简单的方法!

你可以使用asyncio。(文档可以在这里找到)。它被用作多个Python异步框架的基础,这些框架提供了高性能的网络和web服务器、数据库连接库、分布式任务队列等。此外,它有高级和低级api来适应任何类型的问题。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在这个函数将在调用时并行运行,而不会将主程序置于等待状态。你也可以用它来并行for循环。当调用for循环时,虽然循环是顺序的,但每次迭代都是在解释器到达主程序时并行运行的。

1. 发射环平行于主线程没有任何等待

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

这将产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

更新:2022年5月

虽然这回答了最初的问题,但有一些方法可以让我们按照被点赞的评论的要求等待循环完成。把它们也加在这里。实现的关键是:asyncio.gather() & run_until_complete()。考虑以下函数:

import asyncio
import time

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument, other_argument): # Added another argument
    time.sleep(5)
    print(f"function finished for {argument=} and {other_argument=}")

def code_to_run_before():
    print('This runs Before Loop!')

def code_to_run_after():
    print('This runs After Loop!')

2. 平行跑,但要等待结束

code_to_run_before()                                                         # Anything you want to run before, run here!

loop = asyncio.get_event_loop()                                              # Have a new event loop

looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)])         # Run the loop
                               
results = loop.run_until_complete(looper)                                    # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!

3.并行运行多个循环并等待完成

code_to_run_before()                                                         # Anything you want to run before, run here!   

loop = asyncio.get_event_loop()                                              # Have a new event loop

group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)])         # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)])         # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)])         # Run all the loops you want

all_groups = asyncio.gather(group1, group2, group3)                          # Gather them all                                    
results = loop.run_until_complete(all_groups)                                # Wait until finish

code_to_run_after()                                                          # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!

4. 循环按顺序运行,但每个循环的迭代都是彼此并行运行的

code_to_run_before()                                                               # Anything you want to run before, run here!

for loop_number in range(3):

    loop = asyncio.get_event_loop()                                                # Have a new event loop

    looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop
                             
    results = loop.run_until_complete(looper)                                      # Wait until finish

    print(f"finished for {loop_number=}")       

code_to_run_after()                                                                # Anything you want to run after, run here!

这将产生以下输出:

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

更新:2022年6月

这在目前的形式可能无法运行在某些版本的jupyter笔记本电脑。原因是jupyter笔记本利用事件循环。要使它在这样的jupyter版本上工作,nest_asyncio(从名称可以看出,它将嵌套事件循环)是可行的方法。只需导入并应用它在单元格的顶部:

import nest_asyncio
nest_asyncio.apply()

上面讨论的所有功能在笔记本环境中也应该可以访问。

from joblib import Parallel, delayed
def process(i):
    return i * i
    
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results)  # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

以上在我的机器上工作得很漂亮(Ubuntu,包joblib是预安装的,但可以通过pip install joblib安装)。

摘自https://blog.dominodatalab.com/simple-parallelization/


编辑于2021年3月31日:关于joblib, multiprocessing, threading和asyncio

joblib in the above code uses import multiprocessing under the hood (and thus multiple processes, which is typically the best way to run CPU work across cores - because of the GIL) You can let joblib use multiple threads instead of multiple processes, but this (or using import threading directly) is only beneficial if the threads spend considerable time on I/O (e.g. read/write to disk, send an HTTP request). For I/O work, the GIL does not block the execution of another thread Since Python 3.7, as an alternative to threading, you can parallelise work with asyncio, but the same advice applies like for import threading (though in contrast to latter, only 1 thread will be used; on the plus side, asyncio has a lot of nice features which are helpful for async programming) Using multiple processes incurs overhead. Think about it: Typically, each process needs to initialise/load everything you need to run your calculation. You need to check yourself if the above code snippet improves your wall time. Here is another one, for which I confirmed that joblib produces better results:

import time
from joblib import Parallel, delayed

def countdown(n):
    while n>0:
        n -= 1
    return n


t = time.time()
for _ in range(20):
    print(countdown(10**7), end=" ")
print(time.time() - t)  
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro