并行化这段代码最简单的方法是什么?
使用concurrent.futures中的PoolExecutor。将原始代码与此代码并排比较。首先,最简洁的方法是使用executor.map:
...
with ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(calc_stuff, parameters):
...
或者通过单独提交每个电话来分解:
...
with ThreadPoolExecutor() as executor:
futures = []
for parameter in parameters:
futures.append(executor.submit(calc_stuff, parameter))
for future in futures:
out1, out2, out3 = future.result() # this will block
...
离开上下文表示执行程序释放资源
您可以使用线程或进程,并使用完全相同的接口。
一个工作示例
下面是工作示例代码,将演示的价值:
把它放在一个文件futuretest.py中:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection
def processor_intensive(arg):
def fib(n): # recursive, processor intensive calculation (avoid n > 36)
return fib(n-1) + fib(n-2) if n > 1 else n
start = time()
result = fib(arg)
return time() - start, result
def io_bound(arg):
start = time()
con = HTTPSConnection(arg)
con.request('GET', '/')
result = con.getresponse().getcode()
return time() - start, result
def manager(PoolExecutor, calc_stuff):
if calc_stuff is io_bound:
inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
'noaa.gov', 'parler.com', 'aaronhall.dev')
else:
inputs = range(25, 32)
timings, results = list(), list()
start = time()
with PoolExecutor() as executor:
for timing, result in executor.map(calc_stuff, inputs):
# put results into correct output list:
timings.append(timing), results.append(result)
finish = time()
print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
print(f'wall time to execute: {finish-start}')
print(f'total of timings for each call: {sum(timings)}')
print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
print(dict(zip(inputs, results)), end = '\n\n')
def main():
for computation in (processor_intensive, io_bound):
for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
manager(pool_executor, calc_stuff=computation)
if __name__ == '__main__':
main()
下面是python -m futuretest一次运行的输出:
processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}
io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}
处理器密集型的分析
在Python中执行处理器密集型计算时,期望ProcessPoolExecutor比ThreadPoolExecutor性能更好。
由于全局解释器锁(又名GIL)的存在,线程不能使用多个处理器,因此每次计算的时间和壁时间(实际运行的时间)会更大。
IO-bound分析
另一方面,当执行IO绑定操作时,期望ThreadPoolExecutor比ProcessPoolExecutor性能更好。
Python的线程是真实的,OS,线程。操作系统可以让它们进入睡眠状态,并在信息到达时将它们重新唤醒。
最终的想法
我怀疑在Windows上多处理会更慢,因为Windows不支持分叉,所以每个新进程都要花时间启动。
您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来派生多个进程。
如果在Python中面临一个繁重的处理问题,您可以简单地使用额外的进程来扩展—但不能使用线程。