我发现在Python 3.4中,有几个不同的多处理/线程库:multiprocessing vs threading vs asyncio。

但我不知道该用哪一个,或者是“推荐的”。它们做的事情是一样的,还是不同的?如果是的话,哪个是用来干什么的?我想在我的计算机上写一个使用多核的程序。但我不知道该学哪个图书馆。


当前回答

They are intended for (slightly) different purposes and/or requirements. CPython (a typical, mainline Python implementation) still has the global interpreter lock so a multi-threaded application (a standard way to implement parallel processing nowadays) is suboptimal. That's why multiprocessing may be preferred over threading. But not every problem may be effectively split into [almost independent] pieces, so there may be a need in heavy interprocess communications. That's why multiprocessing may not be preferred over threading in general.

asyncio(该技术不仅在Python中可用,其他语言和/或框架也有它,例如Boost.ASIO)是一种有效处理来自多个同步源的大量I/O操作的方法,而不需要并行代码执行。因此,它只是针对特定任务的解决方案(确实是一个不错的解决方案!),而不是用于一般的并行处理。

其他回答

许多答案建议如何只选择一个选项,但为什么不能使用所有三个选项呢?在这个回答中,我将解释如何使用asyncio来管理所有三种并发形式的组合,以及在需要时在它们之间轻松切换。

简短的回答


Many developers that are first-timers to concurrency in Python will end up using processing.Process and threading.Thread. However, these are the low-level APIs which have been merged together by the high-level API provided by the concurrent.futures module. Furthermore, spawning processes and threads has overhead, such as requiring more memory, a problem which plagued one of the examples I showed below. To an extent, concurrent.futures manages this for you so that you cannot as easily do something like spawn a thousand processes and crash your computer by only spawning a few processes and then just re-using those processes each time one finishes.

这些高级api是通过concurrent.futures提供的。然后由concurrent.future . processpoolexecutor和concurrent.future . threadpoolexecutor实现。在大多数情况下,您应该在多处理中使用这些。进程和线程。线程,因为将来当你使用并发时,从一个转换到另一个更容易。你不需要学习每种期货的详细区别。

由于它们共享统一的接口,您还会发现使用多处理或线程的代码将经常使用concurrent.futures。Asyncio也不例外,并提供了一种通过以下代码使用它的方法:

import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())

事实上,使用线程和asyncio是如此普遍,以至于在Python 3.9中他们添加了asyncio。to_thread(func, *args, **kwargs)来缩短默认的ThreadPoolExecutor。

长话短说


这种方法有什么缺点吗?

是的。使用asyncio,最大的缺点是异步函数与同步函数不同。如果你没有从一开始就考虑到asyncio,这可能会让asyncio的新用户陷入困境,并导致大量的返工。

另一个缺点是代码的用户也将被迫使用asyncio。所有这些必要的返工通常会让首次使用asyncio的用户感到不快。

这样做有什么非性能优势吗?

Yes. Similar to how using concurrent.futures is advantageous over threading.Thread and multiprocessing.Process for its unified interface, this approach can be considered a further abstraction from an Executor to an asynchronous function. You can start off using asyncio, and if later you find a part of it you need threading or multiprocessing, you can use asyncio.to_thread or run_in_executor. Likewise, you may later discover that an asynchronous version of what you're trying to run with threading already exists, so you can easily step back from using threading and switch to asyncio instead.

这样做是否有性能优势?

是的……也没有。最终取决于任务本身。在某些情况下,它可能没有帮助(尽管它可能没有伤害),而在其他情况下,它可能有很大的帮助。这个答案的其余部分解释了为什么使用asyncio来运行Executor可能是有利的。

-结合多个执行程序和其他异步代码

Asyncio本质上为并发性提供了更多的控制,代价是你需要更多地控制并发性。如果你想使用ThreadPoolExecutor同时运行一些代码和使用ProcessPoolExecutor同时运行一些代码,那么使用同步代码管理这些代码就不那么容易了,但是使用asyncio就非常容易。

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

async def with_processing():
    with ProcessPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def with_threading():
    with ThreadPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def main():
    await asyncio.gather(with_processing(), with_threading())

asyncio.run(main())

How does this work? Essentially asyncio asks the executors to run their functions. Then, while an executor is running, asyncio will go run other code. For example, the ProcessPoolExecutor starts a bunch of processes, and then while waiting for those processes to finish, the ThreadPoolExecutor starts a bunch of threads. asyncio will then check in on these executors and collect their results when they are done. Furthermore, if you have other code using asyncio, you can run them while waiting for the processes and threads to finish.

-缩小代码的哪些部分需要执行器

在你的代码中有很多执行程序是不常见的,但是当人们使用线程/进程时,我看到的一个常见问题是,他们会把他们的整个代码推到一个线程/进程中,期望它能工作。例如,我曾经看到以下代码(大约):

from concurrent.futures import ThreadPoolExecutor
import requests

def get_data(url):
    return requests.get(url).json()["data"]

urls = [...]

with ThreadPoolExecutor() as executor:
    for data in executor.map(get_data, urls):
        print(data)

有趣的是,这段代码有并发时比没有并发时要慢。为什么?因为生成的json很大,许多线程消耗大量内存是灾难性的。幸运的是,解决方法很简单:

from concurrent.futures import ThreadPoolExecutor
import requests

urls = [...]

with ThreadPoolExecutor() as executor:
    for response in executor.map(requests.get, urls):
        print(response.json()["data"])

现在每次只有一个json被卸载到内存中,一切正常。

这里的教训是什么?

你不应该试图把所有的代码都放到线程/进程中,你应该关注代码的哪一部分真正需要并发性。

但是,如果get_data不是一个如此简单的函数呢?如果我们必须在函数中间的某个地方应用执行程序呢?这就是asyncio的用武之地:

import asyncio
import requests

async def get_data(url):
    # A lot of code.
    ...
    # The specific part that needs threading.
    response = await asyncio.to_thread(requests.get, url, some_other_params)
    # A lot of code.
    ...
    return data

urls = [...]

async def main():
    tasks = [get_data(url) for url in urls]
    for task in asyncio.as_completed(tasks):
        data = await task
        print(data)

asyncio.run(main())

尝试同样的并发。期货一点也不漂亮。您可以使用回调、队列等,但这将比基本的asyncio代码更难管理。

In multiprocessing you leverage multiple CPUs to distribute your calculations. Since each of the CPUs runs in parallel, you're effectively able to run multiple tasks simultaneously. You would want to use multiprocessing for CPU-bound tasks. An example would be trying to calculate a sum of all elements of a huge list. If your machine has 8 cores, you can "cut" the list into 8 smaller lists and calculate the sum of each of those lists separately on separate core and then just add up those numbers. You'll get a ~8x speedup by doing that.

In (multi)threading you don't need multiple CPUs. Imagine a program that sends lots of HTTP requests to the web. If you used a single-threaded program, it would stop the execution (block) at each request, wait for a response, and then continue once received a response. The problem here is that your CPU isn't really doing work while waiting for some external server to do the job; it could have actually done some useful work in the meantime! The fix is to use threads - you can create many of them, each responsible for requesting some content from the web. The nice thing about threads is that, even if they run on one CPU, the CPU from time to time "freezes" the execution of one thread and jumps to executing the other one (it's called context switching and it happens constantly at non-deterministic intervals). So if your task is I/O bound - use threading.

asyncio本质上是线程,而不是CPU,而是你,作为一个程序员(或者实际上是你的应用程序),决定何时何地发生上下文切换。在Python中,您使用await关键字来暂停协程的执行(使用async关键字定义)。

They are intended for (slightly) different purposes and/or requirements. CPython (a typical, mainline Python implementation) still has the global interpreter lock so a multi-threaded application (a standard way to implement parallel processing nowadays) is suboptimal. That's why multiprocessing may be preferred over threading. But not every problem may be effectively split into [almost independent] pieces, so there may be a need in heavy interprocess communications. That's why multiprocessing may not be preferred over threading in general.

asyncio(该技术不仅在Python中可用,其他语言和/或框架也有它,例如Boost.ASIO)是一种有效处理来自多个同步源的大量I/O操作的方法,而不需要并行代码执行。因此,它只是针对特定任务的解决方案(确实是一个不错的解决方案!),而不是用于一般的并行处理。

博士TL;

做出正确的选择:

我们已经介绍了最流行的并发形式。但问题依然存在——什么时候应该选择哪一个?这实际上取决于用例。根据我的经验(和阅读),我倾向于遵循以下伪代码:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")

CPU绑定=>多处理 I/O绑定,快速I/O,有限数量的连接=>多线程 I/O受限,慢I/O,多连接=> Asyncio

参考


【注意】:

如果你有一个很长的调用方法(例如,一个包含睡眠时间或惰性I/O的方法),最好的选择是asyncio, Twisted或Tornado方法(协程方法),它与单个线程一起工作作为并发。 asyncio适用于Python3.4及更高版本。 Tornado和Twisted从Python2.7开始就准备好了 Uvloop是超快的asyncio事件循环(Uvloop使asyncio快2-4倍)。


(更新(2019)):

Japranto (GitHub)是一个非常快速的基于uvloop的流水线HTTP服务器。

多处理可以并行运行。 多线程和asyncio不能并行运行。

使用英特尔(R)酷睿(TM) i7-8700K CPU @ 3.70GHz和32.0 GB RAM,我用2个进程、2个线程和2个异步任务计算了2到100000之间有多少素数,如下所示。*这是CPU限制计算:

Multiprocessing Multithreading asyncio
23.87 seconds 45.24 seconds 44.77 seconds

因为多处理可以并行运行,所以如上所示,多处理比多线程和asyncio快两倍。

我使用了以下3组代码:

多处理:

# "process_test.py"

from multiprocessing import Process
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

if __name__ == "__main__": # This is needed to run processes on Windows
    process_list = []

    for _ in range(0, 2): # 2 processes
        process = Process(target=test)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()

    print(round((time.time() - start_time), 2), "seconds") # 23.87 seconds

结果:

...
9592
9592
23.87 seconds

多线程:

# "thread_test.py"

from threading import Thread
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

thread_list = []

for _ in range(0, 2): # 2 threads
    thread = Thread(target=test)
    thread_list.append(thread)
    
for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

print(round((time.time() - start_time), 2), "seconds") # 45.24 seconds

结果:

...
9592
9592
45.24 seconds

Asyncio:

# "asyncio_test.py"

import asyncio
import time
start_time = time.time()

async def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

async def call_tests():
    tasks = []

    for _ in range(0, 2): # 2 asyncio tasks
        tasks.append(test())

    await asyncio.gather(*tasks)

asyncio.run(call_tests())

print(round((time.time() - start_time), 2), "seconds") # 44.77 seconds

结果:

...
9592
9592
44.77 seconds