这个问题是由我的另一个问题:如何在cdef等待?

网络上有大量关于asyncio的文章和博客文章,但它们都非常肤浅。我找不到任何关于asyncio实际是如何实现的,以及什么使I/O异步的信息。我试图阅读源代码,但它有数千行不是最高级的C代码,其中很多处理辅助对象,但最重要的是,它很难将Python语法和它将转换成的C代码联系起来。

Asycnio自己的文档就更没有帮助了。这里没有关于它如何工作的信息,只有一些关于如何使用它的指南,这些指南有时也会误导/写得很糟糕。

我熟悉Go的协程实现,并希望Python也能做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码应该是有效的。既然它没有,我现在正试图找出原因。到目前为止,我最好的猜测如下,请纠正我的错误:

Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?). The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

换句话说,这是我试图将一些asyncio语法“糖化”成更容易理解的东西:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?


当前回答

你的coro糖化是正确的概念,但略不完整。

Await不会无条件挂起,只在遇到阻塞调用时挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的asyncio中,等效代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应类似生成器的对象时,可以等待上面的代码。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人倾向于描述上述方面的产量,从其中自动暂停。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,则循环在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

默认情况下,所有事情都发生在同一个线程中。 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协同程序驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环。

其他回答

你的coro糖化是正确的概念,但略不完整。

Await不会无条件挂起,只在遇到阻塞调用时挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的asyncio中,等效代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应类似生成器的对象时,可以等待上面的代码。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人倾向于描述上述方面的产量,从其中自动暂停。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,则循环在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

默认情况下,所有事情都发生在同一个线程中。 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协同程序驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环。

这一切都归结为asyncio正在解决的两个主要挑战:

如何在一个线程中执行多个I/O ? 如何实现协同多任务处理?

第一点的答案已经存在很长一段时间了,被称为选择循环。在python中,它在selectors模块中实现。

第二个问题与协程的概念有关,即可以停止执行并稍后恢复的函数。在python中,协程是使用生成器和yield from语句实现的。这就是隐藏在async/await语法背后的东西。

在这个答案中有更多的资源。


编辑:解决您对goroutines的评论:

在asyncio中最接近于goroutine的实际上不是协程,而是任务(请参阅文档中的区别)。在python中,协程(或生成器)不知道事件循环或I/O的概念。它只是一个函数,可以使用yield停止执行,同时保持当前状态,以便稍后恢复。语法的yield允许以透明的方式将它们链接起来。

现在,在一个asyncio任务中,链的最底部的协程总是最终产生一个future。然后,这个future出现在事件循环中,并集成到内部机制中。当future被其他内部回调设置为done时,事件循环可以通过将future发送回协程链来恢复任务。


编辑:解决你帖子中的一些问题:

在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个解释器挂起,I/O发生在解释器之外吗?

不,线程中没有发生任何事情。I/O总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,这就为您带来了麻烦。

I/O到底是什么意思?如果我的python过程调用C open()过程,它反过来将中断发送给内核,放弃对它的控制,python解释器如何知道这一点,并能够继续运行一些其他代码,而内核代码进行实际的I/O,直到它唤醒最初发送中断的python过程?原则上,Python解释器如何意识到这种情况?

I/O是任何阻塞调用。在asyncio中,所有的I/O操作都应该经过事件循环,因为如您所说,事件循环无法意识到某些同步代码中正在执行阻塞调用。这意味着您不应该在协程的上下文中使用同步打开。相反,使用专用的库,如aiofiles,它提供了open的异步版本。

什么是asyncio?

Asyncio代表异步输入输出,指的是使用单个线程或事件循环实现高并发的编程范式。 异步编程是一种并行编程,允许工作单元与主应用程序线程分开运行。当工作完成时,它通知主线程工作线程的完成或失败。

让我们看看下图:

让我们用一个例子来理解asyncio:

为了理解asyncio背后的概念,让我们考虑一家只有一个服务员的餐厅。突然,三个顾客,A, B和C出现了。他们三个人从服务员那里拿到菜单后,花了不同的时间来决定吃什么。

假设A需要5分钟,B需要10分钟,C需要1分钟。如果单身服务员先从B开始,在10分钟内为B点餐,然后他为A服务,花5分钟记录他点的菜,最后花1分钟知道C想吃什么。 所以,服务员总共要花10 + 5 + 1 = 16分钟来记下他们点的菜。但是,请注意在这个事件序列中,C在服务员到达他之前等了15分钟,A等了10分钟,B等了0分钟。

现在考虑一下,如果服务员知道每位顾客做出决定所需的时间。他可以先从C开始,然后到A,最后到b。这样每个顾客的等待时间为0分钟。 尽管只有一个服务员,但却产生了三个服务员的错觉,每个顾客都有一个服务员。

最后,服务员完成三份订单所需的总时间为10分钟,远少于另一种情况下的16分钟。

让我们来看另一个例子:

假设,国际象棋大师马格努斯·卡尔森(Magnus Carlsen)主持了一场国际象棋展览,他与多名业余棋手同场竞技。他有两种方式进行展览:同步和异步。

假设:

24的对手 马格努斯·卡尔森在5秒内走完每一步棋 每个对手有55秒的时间来移动 游戏平均30对棋(总共60步)

同步:马格努斯·卡尔森一次只玩一局,从不同时玩两局,直到游戏完成。每款游戏耗时(55 + 5)* 30 == 1800秒,即30分钟。整个展览耗时24 * 30 == 720分钟,即12个小时。

异步:马格努斯·卡尔森从一张桌子移动到另一张桌子,在每张桌子上移动一次。她离开牌桌,让对手在等待时间内采取下一步行动。Judit在所有24局游戏中的一次移动需要24 * 5 == 120秒,即2分钟。整个展览缩短到120 * 30 == 3600秒,也就是1个小时

世界上只有一个马格努斯·卡尔森(Magnus Carlsen),他只有两只手,自己一次只能走一步棋。但异步游戏将展示时间从12小时缩短至1小时。

代码示例:

让我们尝试使用代码片段演示同步和异步执行时间。

异步- async_count.py

import asyncio  
import time  
  
  
async def count():  
    print("One", end=" ")  
    await asyncio.sleep(1)  
    print("Two", end=" ")  
    await asyncio.sleep(2)  
    print("Three", end=" ")  
  
  
async def main():  
    await asyncio.gather(count(), count(), count(), count(), count())  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

异步-输出:

One One One One One Two Two Two Two Two Three Three Three Three Three 
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.

Synchronous - sync_count.py

import time  
  
  
def count():  
    print("One", end=" ")  
    time.sleep(1)  
    print("Two", end=" ")  
    time.sleep(2)  
    print("Three", end=" ")  
  
  
def main():  
    for _ in range(5):  
        count()  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    main()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

同步-输出:

One Two Three One Two Three One Two Three One Two Three One Two Three 
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.

为什么在Python中使用asyncio而不是多线程?

It’s very difficult to write code that is thread safe. With asynchronous code, you know exactly where the code will shift from one task to the next and race conditions are much harder to come by. Threads consume a fair amount of data since each thread needs to have its own stack. With async code, all the code shares the same stack and the stack is kept small due to continuously unwinding the stack between tasks. Threads are OS structures and therefore require more memory for the platform to support. There is no such problem with asynchronous tasks.

asyncio是如何工作的?

在深入讨论之前,让我们回顾一下Python Generator

Python发电机:

包含yield语句的函数被编译为生成器。在函数体中使用yield表达式会导致该函数成为生成器。这些函数返回一个支持迭代协议方法的对象。自动创建的生成器对象接收__next()__方法。回到上一节的例子,我们可以直接在生成器对象上调用__next__,而不是使用next():

def asynchronous():
    yield "Educative"


if __name__ == "__main__":
    gen = asynchronous()

    str = gen.__next__()
    print(str)

请记住以下关于生成器的内容:

Generator functions allow you to procrastinate computing expensive values. You only compute the next value when required. This makes generators memory and compute efficient; they refrain from saving long sequences in memory or doing all expensive computations upfront. Generators, when suspended, retain the code location, which is the last yield statement executed, and their entire local scope. This allows them to resume execution from where they left off. Generator objects are nothing more than iterators. Remember to make a distinction between a generator function and the associated generator object which are often used interchangeably. A generator function when invoked returns a generator object and next() is invoked on the generator object to run the code within the generator function.

发电机状态:

生成器会经历以下几种状态:

当生成器函数第一次返回生成器对象并且迭代还没有开始时,返回GEN_CREATED。 在生成器对象上调用了GEN_RUNNING,并由python解释器执行。 GEN_SUSPENDED:当发电机以一定的产量暂停时 当生成器已完成执行或已关闭时,返回GEN_CLOSED。

生成器对象上的方法:

生成器对象公开了可以调用来操作生成器的不同方法。这些都是:

把() send () close ()

让我们深入了解更多细节

asyncio的规则:

The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid. The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g(), this is how await tells the event loop, "Suspend execution of g() until whatever I’m waiting on—the result of f()—is returned. In the meantime, go let something else run."

在代码中,第二个要点大致如下所示:

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r

关于何时以及如何使用async/await也有一组严格的规则。无论你是否还在学习语法,或者已经使用过async/await,这些都很方便:

A function that you introduce with async def is a coroutine. It may use await, return, or yield, but all of these are optional. Declaring async def noop(): pass is valid: Using await and/or return creates a coroutine function. To call a coroutine function, you must await it to get its results. It is less common to use yield in an async def block. This creates an asynchronous generator, which you iterate over with async for. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which use await and/or return. Anything defined with async def may not use yield from, which will raise a SyntaxError. Just like it’s a SyntaxError to use yield outside of a def function, it is a SyntaxError to use await outside of an async def coroutine. You can only use await in the body of coroutines.

以下是一些简短的例子,旨在总结上述几条规则:

async def f(x):
    y = await z(x)     # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x            # OK - this is an async generator

async def m(x):
    yield from gen(x)  # NO - SyntaxError

def m(x):
    y = await z(x)     # NO - SyntaxError (no `async def` here)
    return y

基于生成器的协程

Python创建了Python生成器和用于协程的生成器之间的区别。这些协程称为基于生成器的协程,并且需要@asynio装饰器。将协程添加到函数定义中,尽管这没有严格执行。

基于生成器的协程使用yield from语法而不是yield。协程可以:

屈服于另一个协程 未来收益 返回一个表达式 提高异常

Python中的协程使得多任务合作成为可能。 协作多任务处理是指正在运行的进程主动将CPU让给其他进程的方法。当一个进程在逻辑上被阻塞时,比如在等待用户输入时,或者当它发起了一个网络请求并将空闲一段时间时,它可能会这样做。 协程可以定义为一个特殊的函数,它可以在不丢失状态的情况下将控制权交给调用者。

那么协程和生成器之间的区别是什么呢?

生成器本质上是迭代器,尽管它们看起来像函数。一般来说,生成器和协程之间的区别是:

生成器将一个值返回给调用者,而协程将控制权交还给另一个协程,并且可以从它放弃控制权开始恢复执行。 一旦启动,生成器就不能接受参数,而协程可以。 生成器主要用于简化迭代器的编写。它们是一种协程,有时也称为半协程。

基于生成器的协程示例

我们可以编写的最简单的基于生成器的协程如下所示:

@asyncio.coroutine
def do_something_important():
    yield from asyncio.sleep(1)

协程休眠一秒。注意装饰器的使用和yield from。

本地基于协程示例

原生的意思是,该语言引入了语法来专门定义协程,使它们成为语言中的一等公民。本地协程可以使用async/await语法定义。 我们可以编写的最简单的基于本机的协程如下所示:

async def do_something_important():
    await asyncio.sleep(1)

AsyncIO设计模式

AsyncIO有自己的一组可能的脚本设计,我们将在本节中讨论。

1. 事件循环

事件循环是一种编程构造,它等待事件发生,然后将它们分派给事件处理程序。事件可以是用户单击UI按钮,也可以是启动文件下载的进程。异步编程的核心是事件循环。

示例代码:

import asyncio  
import random  
import time  
from threading import Thread  
from threading import current_thread  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def do_something_important(sleep_for):  
    print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
    await asyncio.sleep(sleep_for)  
  
  
def launch_event_loops():  
    # get a new event loop  
  loop = asyncio.new_event_loop()  
  
    # set the event loop for the current thread  
  asyncio.set_event_loop(loop)  
  
    # run a coroutine on the event loop  
  loop.run_until_complete(do_something_important(random.randint(1, 5)))  
  
    # remember to close the loop  
  loop.close()  
  
  
if __name__ == "__main__":  
    thread_1 = Thread(target=launch_event_loops)  
    thread_2 = Thread(target=launch_event_loops)  
  
    start_time = time.perf_counter()  
    thread_1.start()  
    thread_2.start()  
  
    print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])  
  
    thread_1.join()  
    thread_2.join()  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_event_loop.py

输出:

自己尝试并检查输出,您会发现每个衍生线程都在运行自己的事件循环。

事件循环的类型

有两种类型的事件循环:

SelectorEventLoop: SelectorEventLoop基于选择器模块,是所有平台上的默认循环。 ProactorEventLoop: ProactorEventLoop基于Windows的I/O完成端口,仅在Windows上支持。

2. 期货

Future表示正在进行或将在未来被调度的计算。它是一个特殊的低级可等待对象,表示异步操作的最终结果。不要混淆线程。Future和asyncio.Future。

示例代码:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
    await asyncio.gather(foo(future), bar(future))  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    asyncio.run(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_futures.py

输出:

两个协程都传递了一个future。foo()协程等待future被解析,而bar()协程则在三秒后解析future。

3.任务

任务就像未来,事实上,任务是未来的一个子类,可以使用以下方法创建:

Asyncio.create_task()接受协程并将它们包装为任务。 Loop.create_task()只接受协程。 Asyncio.ensure_future()接受未来、协程和任何可等待对象。

任务包装协程并在事件循环中运行它们。如果一个协程在等待一个Future, Task将挂起该协程的执行并等待Future完成。当Future完成时,将继续执行封装的协程。

示例代码:

import time  
import asyncio  
from asyncio import Future  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[34m",  # Blue  
)  
  
  
async def bar(future):  
    print(colors[1] + "bar will sleep for 3 seconds" + colors[0])  
    await asyncio.sleep(3)  
    print(colors[1] + "bar resolving the future" + colors[0])  
    future.done()  
    future.set_result("future is resolved")  
  
  
async def foo(future):  
    print(colors[2] + "foo will await the future" + colors[0])  
    await future  
  print(colors[2] + "foo finds the future resolved" + colors[0])  
  
  
async def main():  
    future = Future()  
  
    loop = asyncio.get_event_loop()  
    t1 = loop.create_task(bar(future))  
    t2 = loop.create_task(foo(future))  
  
    await t2, t1  
  
  
if __name__ == "__main__":  
    start_time = time.perf_counter()  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(main())  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_tasks.py

输出:

4. 链协同程序:

协程的一个关键特性是它们可以被链接在一起。协程对象是可等待的,因此另一个协程可以等待它。这允许你把程序分解成更小的、可管理的、可回收的协程:

示例代码:

import sys  
import asyncio  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def function1(n: int) -> str:  
    i = random.randint(0, 10)  
    print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-1"  
  print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])  
    return result  
  
  
async def function2(n: int, arg: str) -> str:  
    i = random.randint(0, 10)  
    print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
    result = f"result{n}-2 derived from {arg}"  
  print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])  
    return result  
  
  
async def chain(n: int) -> None:  
    start = time.perf_counter()  
    p1 = await function1(n)  
    p2 = await function2(n, p1)  
    end = time.perf_counter() - start  
    print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])  
  
  
async def main(*args):  
    await asyncio.gather(*(chain(n) for n in args))  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])  
    start_time = time.perf_counter()  
    asyncio.run(main(*args))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

请仔细注意输出,其中function1()休眠了可变的时间,function2()在结果可用时开始工作:

执行命令:python async_chained.py 11 8

输出:

5. 使用队列:

在这种设计中,没有任何个体消费者与生产者之间的链接。消费者不知道生产者的数量,甚至不知道将被添加到队列中的项目的累积数量。

单个生产者或消费者分别花费不同的时间从队列中放置和提取项目。队列作为一个吞吐量,可以与生产者和消费者进行通信,而不需要它们彼此直接通信。

示例代码:

import asyncio  
import argparse  
import itertools as it  
import os  
import random  
import time  
  
# ANSI colors  
colors = (  
    "\033[0m",  # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[34m",  # Blue  
)  
  
  
async def generate_item(size: int = 5) -> str:  
    return os.urandom(size).hex()  
  
  
async def random_sleep(caller=None) -> None:  
    i = random.randint(0, 10)  
    if caller:  
        print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])  
    await asyncio.sleep(i)  
  
  
async def produce(name: int, producer_queue: asyncio.Queue) -> None:  
    n = random.randint(0, 10)  
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer  
  await random_sleep(caller=f"Producer {name}")  
        i = await generate_item()  
        t = time.perf_counter()  
        await producer_queue.put((i, t))  
        print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])  
  
  
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:  
    while True:  
        await random_sleep(caller=f"Consumer {name}")  
        i, t = await consumer_queue.get()  
        now = time.perf_counter()  
        print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])  
        consumer_queue.task_done()  
  
  
async def main(no_producer: int, no_consumer: int):  
    q = asyncio.Queue()  
    producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]  
    consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]  
    await asyncio.gather(*producers)  
    await q.join()  # Implicitly awaits consumers, too  
  for consumer in consumers:  
        consumer.cancel()  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-p", "--no_producer", type=int, default=10)  
    parser.add_argument("-c", "--no_consumer", type=int, default=15)  
    ns = parser.parse_args()  
    start_time = time.perf_counter()  
    asyncio.run(main(**ns.__dict__))  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_queue.py -p 2 -c 4 .执行以下命令

输出:

最后,让我们看一个asyncio如何减少等待时间的例子:给定一个协程generate_random_int(),它不断生成范围为[0,10]的随机整数,直到其中一个超出阈值,您希望让这个协程的多个调用不需要彼此连续等待完成。

示例代码:

import time  
import asyncio  
import random  
  
# ANSI colors  
colors = (  
    "\033[0m",   # End of color  
  "\033[31m",  # Red  
  "\033[32m",  # Green  
  "\033[36m",  # Cyan  
  "\033[35m",  # Magenta  
  "\033[34m",  # Blue  
)  
  
  
async def generate_random_int(indx: int, threshold: int = 5) -> int:  
    print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")  
    i = random.randint(0, 10)  
    while i <= threshold:  
        print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")  
        await asyncio.sleep(indx + 1)  
        i = random.randint(0, 10)  
    print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])  
    return i  
  
  
async def main():  
    res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))  
    return res  
  
  
if __name__ == "__main__":  
    random.seed(444)  
    start_time = time.perf_counter()  
    r1, r2, r3 = asyncio.run(main())  
    print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])  
    end_time = time.perf_counter()  
    execution_time = end_time - start_time  
    print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令:python async_random.py

输出:

注意:如果您自己编写任何代码,最好使用本机协同程序 为了明确而不是含蓄。发电机的基础 协程将在Python 3.10中被移除。

GitHub Repo: https://github.com/tssovi/asynchronous-in-python

asyncio是如何工作的?

在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,就跳过这些。

发电机

生成器是允许我们暂停python函数执行的对象。用户管理生成器使用关键字yield实现。通过创建一个包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,在生成器上调用next()会导致解释器加载测试的帧,并返回产生的值。再次调用next()将导致该帧再次加载到解释器堆栈中,并继续产生另一个值。

在第三次调用next()时,生成器已经完成,并抛出StopIteration。

与发电机通信

生成器的一个鲜为人知的特性是,您可以使用两个方法与它们通信:send()和throw()。

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

在调用gen.send()时,该值作为yield关键字的返回值传递。

另一方面,gen.throw()允许在生成器内部抛出异常,在yield被调用的同一点引发异常。

从生成器返回值

从生成器返回一个值,导致该值被放入StopIteration异常中。我们可以稍后从异常中恢复值,并将其用于我们的需要。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键词:屈服

Python 3.4添加了一个新的关键字:yield from。该关键字允许我们做的是将任何next(), send()和throw()传递到最内部嵌套的生成器中。如果内部生成器返回一个值,它也是yield from的返回值:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我已经写了一篇文章来进一步阐述这个主题。

把它们放在一起

在Python 3.4中引入新的关键字yield后,我们现在能够在生成器中创建生成器,就像隧道一样,将数据从最内部的生成器来回传递到最外部的生成器。这为生成器带来了一个新的含义——协程。

协程是可以在运行时停止和恢复的函数。在Python中,它们是使用async def关键字定义的。就像发电机一样,它们也使用自己的等待产量形式。在Python 3.5引入async和await之前,我们以与生成器创建完全相同的方式创建协程(使用yield from而不是await)。

async def inner():
    return 1

async def outer():
    await inner()

就像所有迭代器和生成器实现__iter__()方法一样,所有协程都实现__await__(),这允许它们在每次await coro被调用时继续执行。

在Python文档中有一个很好的序列图,你应该去看看。

在asyncio中,除了协程函数,我们还有两个重要的对象:任务和未来。

期货

期货是实现了__await__()方法的对象,它们的工作是保存特定的状态和结果。状态可以是以下状态之一:

PENDING - future没有任何结果或异常集。 CANCELLED -使用fut.cancel()取消未来 FINISHED - future被结束,通过使用fut.set_result()的结果集或使用fut.set_exception()的异常集完成。

正如您所猜测的那样,结果可以是一个将返回的Python对象,也可以是一个可能引发的异常。

未来对象的另一个重要特性是它们包含一个名为add_done_callback()的方法。此方法允许在任务完成时立即调用函数——无论它引发异常还是完成。

任务

任务对象是特殊的期货,它围绕着协程,并与最内部和最外部的协程通信。每次协程等待future时,future就会一直传递给任务(就像yield from一样),然后任务接收它。

接下来,任务将自己绑定到未来。它通过在将来调用add_done_callback()来做到这一点。从现在开始,如果将来要完成,无论是取消,传递异常,还是传递一个Python对象,任务的回调将被调用,它将上升到存在。

Asyncio

我们必须回答的最后一个紧迫问题是——IO是如何实现的?

在asyncio的深处,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次任务准备就绪时调用它们,并将所有工作协调到一个工作机器中。

事件循环的IO部分构建在一个称为select的关键函数之上。Select是一个阻塞函数,由下面的操作系统实现,允许在套接字上等待传入或传出数据。在接收到数据时,它将被唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果它的.send()缓冲区已满,或者.recv()缓冲区为空,套接字就会注册到select函数(只需将其添加到其中一个列表中,rlist用于recv, wlist用于send),相应的函数将等待一个新创建的future对象,该对象与该套接字绑定。

当所有可用任务都在等待未来时,事件循环调用select并等待。当其中一个套接字有传入数据,或者它的发送缓冲区耗尽时,asyncio检查绑定到该套接字的未来对象,并将其设置为done。

现在奇迹发生了。未来被设置为完成,之前使用add_done_callback()添加自己的任务将复活,并在协程上调用.send(),该协程将恢复最内部的协程(因为等待链),您将从它溢出到的附近缓冲区读取新接收的数据。

在recv()的情况下,再次使用方法链:

选择。选择等待。 返回一个就绪的套接字,其中包含数据。 来自套接字的数据被移动到缓冲区中。 调用Future.set_result()。 使用add_done_callback()添加自己的任务现在被唤醒。 Task在协程上调用.send(),它会一直进入最内部的协程并唤醒它。 数据从缓冲区读取并返回给我们的普通用户。

总之,asyncio使用生成器功能,允许暂停和恢复函数。它使用yield from功能,允许将数据从最内部的生成器来回传递到最外部的生成器。它使用所有这些方法是为了在等待IO完成时暂停函数执行(通过使用OS选择函数)。

最好的是什么?当一个函数暂停时,另一个函数可能会运行并与精致的结构交织,这是asyncio的。

If you picture an airport control tower, with many planes waiting to land on the same runway. The control tower can be seen as the event loop and runway as the thread. Each plane is a separate function waiting to execute. In reality only one plane can land on the runway at a time. What asyncio basically does it allows many planes to land simultaneously on the same runway by using the event loop to suspend functions and allow other functions to run when you use the await syntax it basically means that plane(function can be suspended and allow other functions to process