我发现在Python 3.4中,有几个不同的多处理/线程库:multiprocessing vs threading vs asyncio。

但我不知道该用哪一个,或者是“推荐的”。它们做的事情是一样的,还是不同的?如果是的话,哪个是用来干什么的?我想在我的计算机上写一个使用多核的程序。但我不知道该学哪个图书馆。


当前回答

许多答案建议如何只选择一个选项,但为什么不能使用所有三个选项呢?在这个回答中,我将解释如何使用asyncio来管理所有三种并发形式的组合,以及在需要时在它们之间轻松切换。

简短的回答


Many developers that are first-timers to concurrency in Python will end up using processing.Process and threading.Thread. However, these are the low-level APIs which have been merged together by the high-level API provided by the concurrent.futures module. Furthermore, spawning processes and threads has overhead, such as requiring more memory, a problem which plagued one of the examples I showed below. To an extent, concurrent.futures manages this for you so that you cannot as easily do something like spawn a thousand processes and crash your computer by only spawning a few processes and then just re-using those processes each time one finishes.

这些高级api是通过concurrent.futures提供的。然后由concurrent.future . processpoolexecutor和concurrent.future . threadpoolexecutor实现。在大多数情况下,您应该在多处理中使用这些。进程和线程。线程,因为将来当你使用并发时,从一个转换到另一个更容易。你不需要学习每种期货的详细区别。

由于它们共享统一的接口,您还会发现使用多处理或线程的代码将经常使用concurrent.futures。Asyncio也不例外,并提供了一种通过以下代码使用它的方法:

import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())

事实上,使用线程和asyncio是如此普遍,以至于在Python 3.9中他们添加了asyncio。to_thread(func, *args, **kwargs)来缩短默认的ThreadPoolExecutor。

长话短说


这种方法有什么缺点吗?

是的。使用asyncio,最大的缺点是异步函数与同步函数不同。如果你没有从一开始就考虑到asyncio,这可能会让asyncio的新用户陷入困境,并导致大量的返工。

另一个缺点是代码的用户也将被迫使用asyncio。所有这些必要的返工通常会让首次使用asyncio的用户感到不快。

这样做有什么非性能优势吗?

Yes. Similar to how using concurrent.futures is advantageous over threading.Thread and multiprocessing.Process for its unified interface, this approach can be considered a further abstraction from an Executor to an asynchronous function. You can start off using asyncio, and if later you find a part of it you need threading or multiprocessing, you can use asyncio.to_thread or run_in_executor. Likewise, you may later discover that an asynchronous version of what you're trying to run with threading already exists, so you can easily step back from using threading and switch to asyncio instead.

这样做是否有性能优势?

是的……也没有。最终取决于任务本身。在某些情况下,它可能没有帮助(尽管它可能没有伤害),而在其他情况下,它可能有很大的帮助。这个答案的其余部分解释了为什么使用asyncio来运行Executor可能是有利的。

-结合多个执行程序和其他异步代码

Asyncio本质上为并发性提供了更多的控制,代价是你需要更多地控制并发性。如果你想使用ThreadPoolExecutor同时运行一些代码和使用ProcessPoolExecutor同时运行一些代码,那么使用同步代码管理这些代码就不那么容易了,但是使用asyncio就非常容易。

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

async def with_processing():
    with ProcessPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def with_threading():
    with ThreadPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def main():
    await asyncio.gather(with_processing(), with_threading())

asyncio.run(main())

How does this work? Essentially asyncio asks the executors to run their functions. Then, while an executor is running, asyncio will go run other code. For example, the ProcessPoolExecutor starts a bunch of processes, and then while waiting for those processes to finish, the ThreadPoolExecutor starts a bunch of threads. asyncio will then check in on these executors and collect their results when they are done. Furthermore, if you have other code using asyncio, you can run them while waiting for the processes and threads to finish.

-缩小代码的哪些部分需要执行器

在你的代码中有很多执行程序是不常见的,但是当人们使用线程/进程时,我看到的一个常见问题是,他们会把他们的整个代码推到一个线程/进程中,期望它能工作。例如,我曾经看到以下代码(大约):

from concurrent.futures import ThreadPoolExecutor
import requests

def get_data(url):
    return requests.get(url).json()["data"]

urls = [...]

with ThreadPoolExecutor() as executor:
    for data in executor.map(get_data, urls):
        print(data)

有趣的是,这段代码有并发时比没有并发时要慢。为什么?因为生成的json很大,许多线程消耗大量内存是灾难性的。幸运的是,解决方法很简单:

from concurrent.futures import ThreadPoolExecutor
import requests

urls = [...]

with ThreadPoolExecutor() as executor:
    for response in executor.map(requests.get, urls):
        print(response.json()["data"])

现在每次只有一个json被卸载到内存中,一切正常。

这里的教训是什么?

你不应该试图把所有的代码都放到线程/进程中,你应该关注代码的哪一部分真正需要并发性。

但是,如果get_data不是一个如此简单的函数呢?如果我们必须在函数中间的某个地方应用执行程序呢?这就是asyncio的用武之地:

import asyncio
import requests

async def get_data(url):
    # A lot of code.
    ...
    # The specific part that needs threading.
    response = await asyncio.to_thread(requests.get, url, some_other_params)
    # A lot of code.
    ...
    return data

urls = [...]

async def main():
    tasks = [get_data(url) for url in urls]
    for task in asyncio.as_completed(tasks):
        data = await task
        print(data)

asyncio.run(main())

尝试同样的并发。期货一点也不漂亮。您可以使用回调、队列等,但这将比基本的asyncio代码更难管理。

其他回答

这是基本思想:

是IO-BOUND吗?----------->使用asyncio 它的cpu量大吗?--------->使用多处理 其他的吗?---------------------->使用线程

所以基本上坚持线程,除非你有IO/CPU问题。

博士TL;

做出正确的选择:

我们已经介绍了最流行的并发形式。但问题依然存在——什么时候应该选择哪一个?这实际上取决于用例。根据我的经验(和阅读),我倾向于遵循以下伪代码:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")

CPU绑定=>多处理 I/O绑定,快速I/O,有限数量的连接=>多线程 I/O受限,慢I/O,多连接=> Asyncio

参考


【注意】:

如果你有一个很长的调用方法(例如,一个包含睡眠时间或惰性I/O的方法),最好的选择是asyncio, Twisted或Tornado方法(协程方法),它与单个线程一起工作作为并发。 asyncio适用于Python3.4及更高版本。 Tornado和Twisted从Python2.7开始就准备好了 Uvloop是超快的asyncio事件循环(Uvloop使asyncio快2-4倍)。


(更新(2019)):

Japranto (GitHub)是一个非常快速的基于uvloop的流水线HTTP服务器。

多处理 每个进程都有自己的Python解释器,并且可以在处理器的独立内核上运行。Python multiprocessing是一个包,它支持使用类似于threading模块的API生成进程。多处理包提供了真正的并行性,通过使用子进程而不是线程,有效地避开了全局解释器锁。

当你有CPU密集型任务时,使用多处理。

多线程 Python多线程允许在进程中生成多个线程。这些线程可以共享进程的相同内存和资源。在CPython中,由于全局解释器锁,在任何给定的时间都只能运行一个线程,因此你不能利用多个内核。由于GIL的限制,Python中的多线程并不能提供真正的并行性。

Asyncio Asyncio致力于协作多任务概念。Asyncio任务运行在同一个线程上,因此没有并行性,但它为开发人员提供了更好的控制,而不是操作系统,这是多线程的情况。

关于asyncio相对于线程的优点,在这个链接上有一个很好的讨论。

Python摘要中的多处理VS线程VS AsyncIO

多处理可以并行运行。 多线程和asyncio不能并行运行。

使用英特尔(R)酷睿(TM) i7-8700K CPU @ 3.70GHz和32.0 GB RAM,我用2个进程、2个线程和2个异步任务计算了2到100000之间有多少素数,如下所示。*这是CPU限制计算:

Multiprocessing Multithreading asyncio
23.87 seconds 45.24 seconds 44.77 seconds

因为多处理可以并行运行,所以如上所示,多处理比多线程和asyncio快两倍。

我使用了以下3组代码:

多处理:

# "process_test.py"

from multiprocessing import Process
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

if __name__ == "__main__": # This is needed to run processes on Windows
    process_list = []

    for _ in range(0, 2): # 2 processes
        process = Process(target=test)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()

    print(round((time.time() - start_time), 2), "seconds") # 23.87 seconds

结果:

...
9592
9592
23.87 seconds

多线程:

# "thread_test.py"

from threading import Thread
import time
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

thread_list = []

for _ in range(0, 2): # 2 threads
    thread = Thread(target=test)
    thread_list.append(thread)
    
for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

print(round((time.time() - start_time), 2), "seconds") # 45.24 seconds

结果:

...
9592
9592
45.24 seconds

Asyncio:

# "asyncio_test.py"

import asyncio
import time
start_time = time.time()

async def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)

async def call_tests():
    tasks = []

    for _ in range(0, 2): # 2 asyncio tasks
        tasks.append(test())

    await asyncio.gather(*tasks)

asyncio.run(call_tests())

print(round((time.time() - start_time), 2), "seconds") # 44.77 seconds

结果:

...
9592
9592
44.77 seconds

许多答案建议如何只选择一个选项,但为什么不能使用所有三个选项呢?在这个回答中,我将解释如何使用asyncio来管理所有三种并发形式的组合,以及在需要时在它们之间轻松切换。

简短的回答


Many developers that are first-timers to concurrency in Python will end up using processing.Process and threading.Thread. However, these are the low-level APIs which have been merged together by the high-level API provided by the concurrent.futures module. Furthermore, spawning processes and threads has overhead, such as requiring more memory, a problem which plagued one of the examples I showed below. To an extent, concurrent.futures manages this for you so that you cannot as easily do something like spawn a thousand processes and crash your computer by only spawning a few processes and then just re-using those processes each time one finishes.

这些高级api是通过concurrent.futures提供的。然后由concurrent.future . processpoolexecutor和concurrent.future . threadpoolexecutor实现。在大多数情况下,您应该在多处理中使用这些。进程和线程。线程,因为将来当你使用并发时,从一个转换到另一个更容易。你不需要学习每种期货的详细区别。

由于它们共享统一的接口,您还会发现使用多处理或线程的代码将经常使用concurrent.futures。Asyncio也不例外,并提供了一种通过以下代码使用它的方法:

import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())

事实上,使用线程和asyncio是如此普遍,以至于在Python 3.9中他们添加了asyncio。to_thread(func, *args, **kwargs)来缩短默认的ThreadPoolExecutor。

长话短说


这种方法有什么缺点吗?

是的。使用asyncio,最大的缺点是异步函数与同步函数不同。如果你没有从一开始就考虑到asyncio,这可能会让asyncio的新用户陷入困境,并导致大量的返工。

另一个缺点是代码的用户也将被迫使用asyncio。所有这些必要的返工通常会让首次使用asyncio的用户感到不快。

这样做有什么非性能优势吗?

Yes. Similar to how using concurrent.futures is advantageous over threading.Thread and multiprocessing.Process for its unified interface, this approach can be considered a further abstraction from an Executor to an asynchronous function. You can start off using asyncio, and if later you find a part of it you need threading or multiprocessing, you can use asyncio.to_thread or run_in_executor. Likewise, you may later discover that an asynchronous version of what you're trying to run with threading already exists, so you can easily step back from using threading and switch to asyncio instead.

这样做是否有性能优势?

是的……也没有。最终取决于任务本身。在某些情况下,它可能没有帮助(尽管它可能没有伤害),而在其他情况下,它可能有很大的帮助。这个答案的其余部分解释了为什么使用asyncio来运行Executor可能是有利的。

-结合多个执行程序和其他异步代码

Asyncio本质上为并发性提供了更多的控制,代价是你需要更多地控制并发性。如果你想使用ThreadPoolExecutor同时运行一些代码和使用ProcessPoolExecutor同时运行一些代码,那么使用同步代码管理这些代码就不那么容易了,但是使用asyncio就非常容易。

import asyncio
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

async def with_processing():
    with ProcessPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def with_threading():
    with ThreadPoolExecutor() as executor:
        tasks = [...]
        for task in asyncio.as_completed(tasks):
            result = await task
            ...

async def main():
    await asyncio.gather(with_processing(), with_threading())

asyncio.run(main())

How does this work? Essentially asyncio asks the executors to run their functions. Then, while an executor is running, asyncio will go run other code. For example, the ProcessPoolExecutor starts a bunch of processes, and then while waiting for those processes to finish, the ThreadPoolExecutor starts a bunch of threads. asyncio will then check in on these executors and collect their results when they are done. Furthermore, if you have other code using asyncio, you can run them while waiting for the processes and threads to finish.

-缩小代码的哪些部分需要执行器

在你的代码中有很多执行程序是不常见的,但是当人们使用线程/进程时,我看到的一个常见问题是,他们会把他们的整个代码推到一个线程/进程中,期望它能工作。例如,我曾经看到以下代码(大约):

from concurrent.futures import ThreadPoolExecutor
import requests

def get_data(url):
    return requests.get(url).json()["data"]

urls = [...]

with ThreadPoolExecutor() as executor:
    for data in executor.map(get_data, urls):
        print(data)

有趣的是,这段代码有并发时比没有并发时要慢。为什么?因为生成的json很大,许多线程消耗大量内存是灾难性的。幸运的是,解决方法很简单:

from concurrent.futures import ThreadPoolExecutor
import requests

urls = [...]

with ThreadPoolExecutor() as executor:
    for response in executor.map(requests.get, urls):
        print(response.json()["data"])

现在每次只有一个json被卸载到内存中,一切正常。

这里的教训是什么?

你不应该试图把所有的代码都放到线程/进程中,你应该关注代码的哪一部分真正需要并发性。

但是,如果get_data不是一个如此简单的函数呢?如果我们必须在函数中间的某个地方应用执行程序呢?这就是asyncio的用武之地:

import asyncio
import requests

async def get_data(url):
    # A lot of code.
    ...
    # The specific part that needs threading.
    response = await asyncio.to_thread(requests.get, url, some_other_params)
    # A lot of code.
    ...
    return data

urls = [...]

async def main():
    tasks = [get_data(url) for url in urls]
    for task in asyncio.as_completed(tasks):
        data = await task
        print(data)

asyncio.run(main())

尝试同样的并发。期货一点也不漂亮。您可以使用回调、队列等,但这将比基本的asyncio代码更难管理。