Python 3.2引入了并发期货,它似乎是较老的线程和多处理模块的高级组合。
与旧的多处理模块相比,在CPU绑定任务中使用它的优点和缺点是什么?
这篇文章表明他们更容易工作-是这样吗?
Python 3.2引入了并发期货,它似乎是较老的线程和多处理模块的高级组合。
与旧的多处理模块相比,在CPU绑定任务中使用它的优点和缺点是什么?
这篇文章表明他们更容易工作-是这样吗?
当前回答
并发。期货给你更多的控制权,例如:
# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37
import concurrent.futures
import multiprocessing.pool
import random
import threading
import time
def hello(name):
time.sleep(random.random())
return f"Hello {name} {threading.current_thread()} "
print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
print(args, "=>", result)
print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
print(futures[future], "=>", future.result()
示例输出:
ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>
ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
其他回答
除了其他答案的详细差异列表之外,我个人还遇到过多进程可能发生的不固定(截至2022-11-20)无限期挂起。当其中一名工人以某种方式崩溃时。(在我的例子中,来自cython扩展的异常,尽管其他人说这可能发生在工作人员获得SIGTERM等)根据ProcessPoolExecutor的文档,自python 3.3以来,它一直是健壮的。
我喜欢并行。期货,主要是因为多个函数参数的迭代器:在获得一个函数的多个参数时,多处理有点笨拙(没有starmap()等效的istarmap()):
import multiprocessing as mp
def power_plus_one(x, y):
return (x**y) + 1
def wrapper(t):
return power_plus_one(*t)
with mp.Pool() as pool:
r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))
print(r)
我发现imap()/imap_unordered()对于tqdm或大型计算的时间估计等进度条非常有用。在并发。期货,这非常方便:
def power_plus_one(x, y):
return (x**y) + 1
o = dict() # dict for output
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
for future in concurrent.futures.as_completed(futures):
i = futures[future]
o[i] = future.result()
print(o)
我也喜欢方便的结果映射作为字典。:)
使用tqdm,您可以轻松地:
for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
...
我不认为这是并发的。Futures更“高级”——它是一个更简单的接口,无论你是使用多线程还是多进程作为底层并行化噱头,它的工作原理都是一样的。
所以,就像所有“更简单的界面”一样,其中也包含着相同的权衡:它具有较浅的学习曲线,很大程度上是因为可供学习的内容较少;但是,因为它提供的选项更少,它最终可能会以丰富界面所没有的方式让您感到沮丧。
So far as CPU-bound tasks go, that's way too under-specified to say much meaningful. For CPU-bound tasks under CPython, you need multiple processes rather than multiple threads to have any chance of getting a speedup. But how much (if any) of a speedup you get depends on the details of your hardware, your OS, and especially on how much inter-process communication your specific tasks require. Under the covers, all inter-process parallelization gimmicks rely on the same OS primitives - the high-level API you use to get at those isn't a primary factor in bottom-line speed.
编辑:示例
这是你引用的文章中显示的最终代码,但我添加了一个import语句,需要使它工作:
from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
# Let the executor divide the work among processes by using 'map'.
with ProcessPoolExecutor(max_workers=nprocs) as executor:
return {num:factors for num, factors in
zip(nums,
executor.map(factorize_naive, nums))}
下面是用多处理代替的完全相同的事情:
import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
with mp.Pool(nprocs) as pool:
return {num:factors for num, factors in
zip(nums,
pool.map(factorize_naive, nums))}
注意使用多处理的能力。池对象作为上下文管理器是在Python 3.3中添加的。
至于哪个更容易使用,它们本质上是相同的。
一个不同之处在于,Pool支持这么多不同的做事方式,您可能没有意识到它是多么容易,直到您已经爬上了相当高的学习曲线。
再说一次,所有这些不同的方式既是优点也是缺点。他们是一种优势,因为在某些情况下可能需要灵活性。它们是一个弱点,因为“最好只有一种明显的方法来做到这一点”。一个项目独占(如果可能的话)并发。从长远来看,未来可能更容易维护,因为在如何使用它的最小API方面缺乏不必要的新颖性。
根据我的经验,与concurrent.futures相比,我在使用multiprocessing模块时遇到了很多问题。(但这是在Windows操作系统上)
我能看到的两个主要区别是:
频繁挂起多处理模块 并发。期货的执行方式相对简单。这意味着获取结果,跟踪子进程等是非常简单的。
示例:(获取结果)
with concurrent.futures.ProcessPoolExecutor() as executor:
f1 = executor.submit(some_function, parameter_to_be_passed)
print(f1.result())
因此,如果你从some_function()返回任何值,你可以使用f1.result()直接捕获/存储它。同样的事情在“multiprocessing”模块中需要额外的步骤。
如果你在Linux系统上运行,那么挂起可能不会发生,但是“多处理”模块的执行复杂度仍然更高。
同样重要的是,我的任务是高度CPU密集型的任务。
就我个人而言,我推荐concurrent.futures。
并发。期货给你更多的控制权,例如:
# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37
import concurrent.futures
import multiprocessing.pool
import random
import threading
import time
def hello(name):
time.sleep(random.random())
return f"Hello {name} {threading.current_thread()} "
print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
print(args, "=>", result)
print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
print(futures[future], "=>", future.result()
示例输出:
ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>
ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>