Python 3.2引入了并发期货,它似乎是较老的线程和多处理模块的高级组合。

与旧的多处理模块相比,在CPU绑定任务中使用它的优点和缺点是什么?

这篇文章表明他们更容易工作-是这样吗?


当前回答

并发。期货给你更多的控制权,例如:

# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37

import concurrent.futures
import multiprocessing.pool
import random
import threading
import time


def hello(name):
    time.sleep(random.random())
    return f"Hello {name} {threading.current_thread()} "


print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
    print(args, "=>", result)

print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
    print(futures[future], "=>", future.result()

示例输出:

ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>

ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)> 

其他回答

除了其他答案的详细差异列表之外,我个人还遇到过多进程可能发生的不固定(截至2022-11-20)无限期挂起。当其中一名工人以某种方式崩溃时。(在我的例子中,来自cython扩展的异常,尽管其他人说这可能发生在工作人员获得SIGTERM等)根据ProcessPoolExecutor的文档,自python 3.3以来,它一直是健壮的。

我喜欢并行。期货,主要是因为多个函数参数的迭代器:在获得一个函数的多个参数时,多处理有点笨拙(没有starmap()等效的istarmap()):

import multiprocessing as mp

def power_plus_one(x, y):
    return (x**y) + 1

def wrapper(t):
    return power_plus_one(*t)

with mp.Pool() as pool:
    r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))

print(r)

我发现imap()/imap_unordered()对于tqdm或大型计算的时间估计等进度条非常有用。在并发。期货,这非常方便:

def power_plus_one(x, y):
    return (x**y) + 1

o = dict() # dict for output

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
    for future in concurrent.futures.as_completed(futures):
        i = futures[future]
        o[i] = future.result()
print(o)

我也喜欢方便的结果映射作为字典。:)

使用tqdm,您可以轻松地:

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
    ...

我不认为这是并发的。Futures更“高级”——它是一个更简单的接口,无论你是使用多线程还是多进程作为底层并行化噱头,它的工作原理都是一样的。

所以,就像所有“更简单的界面”一样,其中也包含着相同的权衡:它具有较浅的学习曲线,很大程度上是因为可供学习的内容较少;但是,因为它提供的选项更少,它最终可能会以丰富界面所没有的方式让您感到沮丧。

So far as CPU-bound tasks go, that's way too under-specified to say much meaningful. For CPU-bound tasks under CPython, you need multiple processes rather than multiple threads to have any chance of getting a speedup. But how much (if any) of a speedup you get depends on the details of your hardware, your OS, and especially on how much inter-process communication your specific tasks require. Under the covers, all inter-process parallelization gimmicks rely on the same OS primitives - the high-level API you use to get at those isn't a primary factor in bottom-line speed.

编辑:示例

这是你引用的文章中显示的最终代码,但我添加了一个import语句,需要使它工作:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

下面是用多处理代替的完全相同的事情:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}

注意使用多处理的能力。池对象作为上下文管理器是在Python 3.3中添加的。

至于哪个更容易使用,它们本质上是相同的。

一个不同之处在于,Pool支持这么多不同的做事方式,您可能没有意识到它是多么容易,直到您已经爬上了相当高的学习曲线。

再说一次,所有这些不同的方式既是优点也是缺点。他们是一种优势,因为在某些情况下可能需要灵活性。它们是一个弱点,因为“最好只有一种明显的方法来做到这一点”。一个项目独占(如果可能的话)并发。从长远来看,未来可能更容易维护,因为在如何使用它的最小API方面缺乏不必要的新颖性。

根据我的经验,与concurrent.futures相比,我在使用multiprocessing模块时遇到了很多问题。(但这是在Windows操作系统上)

我能看到的两个主要区别是:

频繁挂起多处理模块 并发。期货的执行方式相对简单。这意味着获取结果,跟踪子进程等是非常简单的。

示例:(获取结果)

with concurrent.futures.ProcessPoolExecutor() as executor:
    f1 = executor.submit(some_function, parameter_to_be_passed) 
    print(f1.result())

因此,如果你从some_function()返回任何值,你可以使用f1.result()直接捕获/存储它。同样的事情在“multiprocessing”模块中需要额外的步骤。

如果你在Linux系统上运行,那么挂起可能不会发生,但是“多处理”模块的执行复杂度仍然更高。

同样重要的是,我的任务是高度CPU密集型的任务。

就我个人而言,我推荐concurrent.futures。

并发。期货给你更多的控制权,例如:

# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37

import concurrent.futures
import multiprocessing.pool
import random
import threading
import time


def hello(name):
    time.sleep(random.random())
    return f"Hello {name} {threading.current_thread()} "


print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
    print(args, "=>", result)

print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
    print(futures[future], "=>", future.result()

示例输出:

ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>

ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>