Python 3.2引入了并发期货,它似乎是较老的线程和多处理模块的高级组合。

与旧的多处理模块相比,在CPU绑定任务中使用它的优点和缺点是什么?

这篇文章表明他们更容易工作-是这样吗?


当前回答

可能在大多数需要并行处理的时候,您会发现要么ProcessPoolExecutor类来自并发处理。期货模块或多处理包中的Pool类将提供等效的设施,归根结底是个人偏好的问题。但它们都提供了一些设施,使某些处理更方便。我想我只指出一些:

提交多个任务

每个包都类似于内置的map和itertools。starmap功能。如果你有一个接受单个参数的worker函数,那么可以在任意一个包中使用map方法提交多个任务:

def worker_function(x):
    # Return the square of the passed argument:
    return x ** 2

# multiprocessing.pool example:
from multiprocessing import Pool
with Pool() as pool:
    squares = pool.map(worker_function, (1, 2, 3, 4, 5, 6))

# concurrent.futures example:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    squares = list(executor.map(worker_function, (1, 2, 3, 4, 5, 6)))

注意,multiprocessing.pool.Pool.map方法返回一个列表,而concurrent.futures.ProcessPoolExecutor.map方法返回一个迭代器,就像内置的map方法一样。

这两种map方法都有一个chunksize参数,该参数将提交的任务批量处理为从任务输入队列中取出的“块”,以便池进程在从队列中获取下一个块之前处理一个块中的所有任务。这将导致对输入任务队列的写入和读取次数减少,但数量增加。对于传递给map方法的大型可迭代对象,将任务分块可以大大提高性能。

If not specified the default chunksize value for concurrent.futures.ProcessPoolExecutor is 1, which is no chunking. For multiprocessing.pool.Pool the default value is None, which results in the class calculating a "suitable" chunksize based on the pool size and the number of elements in the passed iterable. At the time of this writing, the chunksize value is computed more or less as int(math.ceil(iterable_size / (4 * pool_size))). When doing multithreading with these packages (discussed briefly later), the default chunksize value for both packages is 1.

如果worker函数接受多个参数,那么使用concurrent会更容易一些。Futures包作为它的map方法可以传递多个可迭代对象:

def worker_function(x, y):
    return x * y

x_values = (1, 2, 3)
y_values = (9, -2, -8)

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = list(executor.map(worker_function, x_values, y_values))

我们必须在multiprocessing包中使用starmap方法,如果每个参数都有单独的可迭代对象,则参数必须“压缩”在一起:

def worker_function(x, y):
    return x * y

x_values = (1, 2, 3)
y_values = (9, -2, -8)

with multiprocessing.Pool() as pool:
    results = pool.starmap(worker_function, zip(x_values, y_values))

如果参数已经按如下方式组合在一起,则不必使用zip内置函数:

def worker_function(x, y):
    return x * y

args = (
    (1, 9), # first x, y pair of arguments
    (2, -2),
    (3, -8)
)
with multiprocessing.Pool() as pool:
    results = pool.starmap(worker_function, args)

在生成任务结果后立即返回

当提交一批任务时,您有时希望在任务结果可用时立即获得任务结果(即返回值)。这两个工具都提供了通知,通过回调机制可以获得提交任务的结果:

使用多处理。池:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def process_result(return_value):
    print(return_value)

def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

使用concurrent.futures的回调也可以做到同样的事情,尽管有点笨拙:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def process_result(future):
    print(future.result())

def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()

if __name__ == '__main__':
    main()

在这里,每个任务都单独提交,并为其返回一个Future实例。然后必须将回调添加到Future。最后,当调用回调函数时,传递的参数是已完成任务的Future实例,必须调用方法result以获得实际返回值。但是对于并发的。期货模块,实际上根本不需要使用回调。你可以使用as_completed方法:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

通过使用字典来保存Future实例,可以很容易地将返回值与最初传递给worker_process的参数联系起来:

import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_process, i): i for i in range(10)}
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())

if __name__ == '__main__':
    main()

multiprocessing.Pool has methods imap and imap_unordered, the latter which allows task results to be returned in arbitrary order, but not necessarily in completion order. These methods are considered to be a lazier version of map. With method map, if the passed iterable argument does not have a __len__ attribute, it will first be converted to a list and its length will be used to compute an effective chunksize value if None was supplied as the chunksize argument. Therefore, you cannot achieve any storage optimizations by using a generator or generator expression as the iterable. But with methods imap and imap_unordered, the iterable can be a generator or generator expression; it will be iterated as necessary to produce new tasks for submission. But this necessitates that the default chunksize parameter be 1 since the length of the iterable in general cannot be known. But that doesn't stop you from providing a reasonable value using the same algorithm that the multiprocessing.Pool class uses if you have a good approximation to the length of the iterable (or the exact size as in the example below):

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize

def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)

if __name__ == '__main__':
    main()

But with imap_unordered there is no way to easily tie a result with a submitted job unless the worker process returned the original call arguments along with the return value. On the other hand the ability to specify a chunksize with imap_unordered and imap, for which the results will be in a predictable order, should make these methods more efficient than invoking the apply_async method repeatedly, which is essentially equivalent to using a chunksize of 1. But if you do need to process results in completion order, then to be sure you should use method apply_async with a callback function. It does, however, appear based on experimentation that if you use a chunksize value of 1 with imap_unordered, the results will be returned in completion order.

The map method of the ProcessPoolExecutor class from the concurrent.futures package is similar in one regard to the Pool.imap method from the multiprocessing package. This method will not convert its passed iterable arguments that are generator expressions to lists in order to compute effective chunksize values and that is why the chunksize argument defaults to 1 and why, if you are passing large iterables, you should consider specifying an appropriate chunksize value. However, unlike with Pool.imap, it is my experience that you cannot retrieve the first result by iterating the iterable returned by map until all the iterables being passed to map have been fully iterated.

提交任务并阻塞直到任务完成

多处理。Pool类有一个方法apply,它向池提交任务并阻塞直到结果就绪。返回值就是传递给apply函数的worker函数的返回值。例如:

import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))

if __name__ == '__main__':
    main()

The concurrent.futures.ProcessPoolExecutor class has no such equivalent. You have to issue a submit and then a call to result against the returned Future instance. It's not a hardship to have to do this, but the Pool.apply method is more convenient for the use case where a blocking task submission is appropriate. Such a case is when you have processing that calls for threading because most of the work being done in the threads is heavily I/O except for perhaps one function that is very CPU bound. The main program that creates the threads first creates a multiprocessing.Pool instance and passes it as an argument to all the threads. When the threads need to call the heavily CPU-bound function, it now runs the function using the Pool.apply method thereby running the code in another process and freeing the current process to allow the other threads to run.

多处理还是多线程?

并发已经做了一件大事。futures模块有两个类,ProcessPoolExecutor和ThreadPoolExecutor具有相同的接口。这是一个很好的功能。但是multiprocessing模块也有一个未记录的ThreadPool类,它具有与Pool相同的接口:

>>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>

注意,你也可以执行多线程:

# This Pool is a function with the same interface as the
# multiprocessing.pool.ThreadPool.__init__ initializer and returns a
# mulitprocessing.pool.ThreadPool instance:
from multiprocessing.dummy import Pool

每次提交一个任务和超时

您可以使用任一ProcessPoolExecutor提交单个任务。submit,返回一个Future实例或Pool。apply_async,返回一个AsyncResult实例,并为检索结果指定一个超时值:

from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)


def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

打印:

hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.

当调用future.result(3)时,主进程将在3秒后得到一个TimeoutError异常,因为提交的任务没有在该时间段内完成。但任务仍在继续运行,占用了进程和以ProcessPoolExecutor(1)为pool: block的线程,因此程序不会终止。

from multiprocessing import Pool, TimeoutError
from time import sleep


def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')


if __name__ == '__main__':
    main()
    print("return from main()")

打印:

hanging
hanging
hanging
timeout
return from main()

This time, however, even though the timed-out task is still continuing to run and is tying up the process, the with block is not prevented from exiting and thus the program terminates normally. The reason for this is that the context manager for the Pool instance will execute a call to terminate when the block exits and this results in the immediate termination of all processes in the pool. This is contrasted with the context handler for the ProcessPoolExecutor instance, which executes a call to shutdown(wait=True) to await the termination of all processes in the pool when the block it governs exits. The advantage would seem to go to multiprocessing.Pool if you are using context handlers to handle pool termination and the possibility of a timeout exists. Update: In Python 3.9, a new argument, cancel_futures, has been added to the shutdown method. Consequently, you can terminate all running tasks and any tasks waiting to run if you explicitly call shutdown(wait=False, cancel_futures=True) instead of relying on the default behavior resulting from the implicit call to shutdown when using a context handler.

但是由于多处理的上下文处理程序。池只调用terminate,而不调用close和join,然后你必须确保你提交的所有作业在退出with块之前已经完成,例如通过提交带有阻塞的同步调用的作业,比如map或调用apply_async返回的AsyncResult对象上的get,或者迭代imap调用的结果,或者通过在池实例上调用close和join。

Although there is no way to exit until timed-out tasks complete when using the ProcessPoolExecutor, you can cancel the starting of submitted tasks that are not already running. In the following demo we have a pool of size 1 so that jobs can only run consecutively. We submit 3 jobs one after another where the first two jobs take 3 seconds to run because of calls to time.sleep(3). We immediately try to cancel the first two jobs. The first attempt of canceling fails because the first job is already running. But because the pool only has one process, the second job must wait 3 seconds for the the first job to complete before it can start running and therefore the cancel succeeds. Finally, job 3 will begin and end almost immediately after job 1 completes, which will be approximately 3 seconds after we started the job submissions:

from concurrent.futures import ProcessPoolExecutor
import time

def worker1(i):
    time.sleep(3)
    print('Done', i)

def worker2():
    print('Hello')

def main():
    with ProcessPoolExecutor(max_workers=1) as executor:
        t = time.time()
        future1 = executor.submit(worker1, 1)
        future2 = executor.submit(worker1, 2)
        future3 = executor.submit(worker2)
        # this will fail since this task is already running:
        print(future1.cancel())
        # this will succeed since this task hasn't started (it's waiting for future1 to complete):
        print(future2.cancel())
        future3.result() # wait for completion
        print(time.time() - t)

if __name__ == '__main__':
    main()

打印:

False
True
Done 1
Hello
3.1249606609344482

其他回答

我不认为这是并发的。Futures更“高级”——它是一个更简单的接口,无论你是使用多线程还是多进程作为底层并行化噱头,它的工作原理都是一样的。

所以,就像所有“更简单的界面”一样,其中也包含着相同的权衡:它具有较浅的学习曲线,很大程度上是因为可供学习的内容较少;但是,因为它提供的选项更少,它最终可能会以丰富界面所没有的方式让您感到沮丧。

So far as CPU-bound tasks go, that's way too under-specified to say much meaningful. For CPU-bound tasks under CPython, you need multiple processes rather than multiple threads to have any chance of getting a speedup. But how much (if any) of a speedup you get depends on the details of your hardware, your OS, and especially on how much inter-process communication your specific tasks require. Under the covers, all inter-process parallelization gimmicks rely on the same OS primitives - the high-level API you use to get at those isn't a primary factor in bottom-line speed.

编辑:示例

这是你引用的文章中显示的最终代码,但我添加了一个import语句,需要使它工作:

from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

下面是用多处理代替的完全相同的事情:

import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}

注意使用多处理的能力。池对象作为上下文管理器是在Python 3.3中添加的。

至于哪个更容易使用,它们本质上是相同的。

一个不同之处在于,Pool支持这么多不同的做事方式,您可能没有意识到它是多么容易,直到您已经爬上了相当高的学习曲线。

再说一次,所有这些不同的方式既是优点也是缺点。他们是一种优势,因为在某些情况下可能需要灵活性。它们是一个弱点,因为“最好只有一种明显的方法来做到这一点”。一个项目独占(如果可能的话)并发。从长远来看,未来可能更容易维护,因为在如何使用它的最小API方面缺乏不必要的新颖性。

根据我的经验,与concurrent.futures相比,我在使用multiprocessing模块时遇到了很多问题。(但这是在Windows操作系统上)

我能看到的两个主要区别是:

频繁挂起多处理模块 并发。期货的执行方式相对简单。这意味着获取结果,跟踪子进程等是非常简单的。

示例:(获取结果)

with concurrent.futures.ProcessPoolExecutor() as executor:
    f1 = executor.submit(some_function, parameter_to_be_passed) 
    print(f1.result())

因此,如果你从some_function()返回任何值,你可以使用f1.result()直接捕获/存储它。同样的事情在“multiprocessing”模块中需要额外的步骤。

如果你在Linux系统上运行,那么挂起可能不会发生,但是“多处理”模块的执行复杂度仍然更高。

同样重要的是,我的任务是高度CPU密集型的任务。

就我个人而言,我推荐concurrent.futures。

我喜欢并行。期货,主要是因为多个函数参数的迭代器:在获得一个函数的多个参数时,多处理有点笨拙(没有starmap()等效的istarmap()):

import multiprocessing as mp

def power_plus_one(x, y):
    return (x**y) + 1

def wrapper(t):
    return power_plus_one(*t)

with mp.Pool() as pool:
    r = list(pool.imap(wrapper, [(0, 1), (2, 2)]))

print(r)

我发现imap()/imap_unordered()对于tqdm或大型计算的时间估计等进度条非常有用。在并发。期货,这非常方便:

def power_plus_one(x, y):
    return (x**y) + 1

o = dict() # dict for output

with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = {executor.submit(power_plus_one, x, y): (x, y) for x, y in [(0, 1), (2, 2)]}
    for future in concurrent.futures.as_completed(futures):
        i = futures[future]
        o[i] = future.result()
print(o)

我也喜欢方便的结果映射作为字典。:)

使用tqdm,您可以轻松地:

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
    ...

并发。期货给你更多的控制权,例如:

# Created by BaiJiFeiLong@gmail.com at 2021/10/19 10:37

import concurrent.futures
import multiprocessing.pool
import random
import threading
import time


def hello(name):
    time.sleep(random.random())
    return f"Hello {name} {threading.current_thread()} "


print("ThreadPool:")
pool = multiprocessing.pool.ThreadPool(4)
for args, result in pool.imap_unordered(lambda x: (x, hello(x)), range(10)):
    print(args, "=>", result)

print("\nThreadPoolExecutor:")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
futures = {executor.submit(hello, x): x for x in range(10)}
for future in concurrent.futures.as_completed(futures):
    print(futures[future], "=>", future.result()

示例输出:

ThreadPool:
1 => Hello 1 <DummyProcess(Thread-2, started daemon 29700)>
0 => Hello 0 <DummyProcess(Thread-1, started daemon 29688)>
2 => Hello 2 <DummyProcess(Thread-3, started daemon 19680)>
6 => Hello 6 <DummyProcess(Thread-3, started daemon 19680)>
3 => Hello 3 <DummyProcess(Thread-4, started daemon 33028)>
4 => Hello 4 <DummyProcess(Thread-2, started daemon 29700)>
5 => Hello 5 <DummyProcess(Thread-1, started daemon 29688)>
9 => Hello 9 <DummyProcess(Thread-2, started daemon 29700)>
8 => Hello 8 <DummyProcess(Thread-4, started daemon 33028)>
7 => Hello 7 <DummyProcess(Thread-3, started daemon 19680)>

ThreadPoolExecutor:
0 => Hello 0 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
1 => Hello 1 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
2 => Hello 2 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
4 => Hello 4 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
3 => Hello 3 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
8 => Hello 8 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)>
5 => Hello 5 <Thread(ThreadPoolExecutor-0_1, started daemon 36220)>
6 => Hello 6 <Thread(ThreadPoolExecutor-0_2, started daemon 13120)>
7 => Hello 7 <Thread(ThreadPoolExecutor-0_0, started daemon 30764)>
9 => Hello 9 <Thread(ThreadPoolExecutor-0_3, started daemon 30260)> 

除了其他答案的详细差异列表之外,我个人还遇到过多进程可能发生的不固定(截至2022-11-20)无限期挂起。当其中一名工人以某种方式崩溃时。(在我的例子中,来自cython扩展的异常,尽管其他人说这可能发生在工作人员获得SIGTERM等)根据ProcessPoolExecutor的文档,自python 3.3以来,它一直是健壮的。