这个问题是由我的另一个问题:如何在cdef等待?

网络上有大量关于asyncio的文章和博客文章,但它们都非常肤浅。我找不到任何关于asyncio实际是如何实现的,以及什么使I/O异步的信息。我试图阅读源代码,但它有数千行不是最高级的C代码,其中很多处理辅助对象,但最重要的是,它很难将Python语法和它将转换成的C代码联系起来。

Asycnio自己的文档就更没有帮助了。这里没有关于它如何工作的信息,只有一些关于如何使用它的指南,这些指南有时也会误导/写得很糟糕。

我熟悉Go的协程实现,并希望Python也能做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码应该是有效的。既然它没有,我现在正试图找出原因。到目前为止,我最好的猜测如下,请纠正我的错误:

Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?). The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

换句话说,这是我试图将一些asyncio语法“糖化”成更容易理解的东西:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?


当前回答

你的coro糖化是正确的概念,但略不完整。

Await不会无条件挂起,只在遇到阻塞调用时挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的asyncio中,等效代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应类似生成器的对象时,可以等待上面的代码。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人倾向于描述上述方面的产量,从其中自动暂停。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,则循环在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

默认情况下,所有事情都发生在同一个线程中。 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协同程序驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环。

其他回答

它允许您编写单线程异步代码,并在Python中实现并发性。基本上,asyncio为异步编程提供了一个事件循环。例如,如果我们需要在不阻塞主线程的情况下发出请求,我们可以使用asyncio库。

asyncio模块允许实现异步编程 使用以下元素的组合:

Event loop: The asyncio module allows an event loop per process. Coroutines: A coroutine is a generator that follows certain conventions. Its most interesting feature is that it can be suspended during execution to wait for external processing (the some routine in I/O) and return from the point it had stopped when the external processing was done. Futures: Futures represent a process that has still not finished. A future is an object that is supposed to have a result in the future and represents uncompleted tasks. Tasks: This is a subclass of asyncio.Future that encapsulates and manages coroutines. We can use the asyncio.Task object to encapsulate a coroutine.

asyncio中最重要的概念是事件循环。事件循环 允许您使用回调或协程编写异步代码。 理解asyncio的关键是协程和事件的术语 循环。协程是有状态函数,当另一个I/O操作正在执行时,可以停止其执行。事件循环用于协调协同例程的执行。

要运行任何协程函数,我们需要获得一个事件循环。我们可以这样做 与

    loop = asyncio.get_event_loop()

这为我们提供了一个BaseEventLoop对象。它有一个run_until_complete方法,该方法接受一个协程并运行它直到完成。然后,协程返回一个结果。在底层,事件循环执行BaseEventLoop.rununtilcomplete(future)方法。

这一切都归结为asyncio正在解决的两个主要挑战:

如何在一个线程中执行多个I/O ? 如何实现协同多任务处理?

第一点的答案已经存在很长一段时间了,被称为选择循环。在python中,它在selectors模块中实现。

第二个问题与协程的概念有关,即可以停止执行并稍后恢复的函数。在python中,协程是使用生成器和yield from语句实现的。这就是隐藏在async/await语法背后的东西。

在这个答案中有更多的资源。


编辑:解决您对goroutines的评论:

在asyncio中最接近于goroutine的实际上不是协程,而是任务(请参阅文档中的区别)。在python中,协程(或生成器)不知道事件循环或I/O的概念。它只是一个函数,可以使用yield停止执行,同时保持当前状态,以便稍后恢复。语法的yield允许以透明的方式将它们链接起来。

现在,在一个asyncio任务中,链的最底部的协程总是最终产生一个future。然后,这个future出现在事件循环中,并集成到内部机制中。当future被其他内部回调设置为done时,事件循环可以通过将future发送回协程链来恢复任务。


编辑:解决你帖子中的一些问题:

在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个解释器挂起,I/O发生在解释器之外吗?

不,线程中没有发生任何事情。I/O总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,这就为您带来了麻烦。

I/O到底是什么意思?如果我的python过程调用C open()过程,它反过来将中断发送给内核,放弃对它的控制,python解释器如何知道这一点,并能够继续运行一些其他代码,而内核代码进行实际的I/O,直到它唤醒最初发送中断的python过程?原则上,Python解释器如何意识到这种情况?

I/O是任何阻塞调用。在asyncio中,所有的I/O操作都应该经过事件循环,因为如您所说,事件循环无法意识到某些同步代码中正在执行阻塞调用。这意味着您不应该在协程的上下文中使用同步打开。相反,使用专用的库,如aiofiles,它提供了open的异步版本。

你的coro糖化是正确的概念,但略不完整。

Await不会无条件挂起,只在遇到阻塞调用时挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的asyncio中,等效代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应类似生成器的对象时,可以等待上面的代码。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人倾向于描述上述方面的产量,从其中自动暂停。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,则循环在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

默认情况下,所有事情都发生在同一个线程中。 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协同程序驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环。

asyncio是如何工作的?

在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,就跳过这些。

发电机

生成器是允许我们暂停python函数执行的对象。用户管理生成器使用关键字yield实现。通过创建一个包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,在生成器上调用next()会导致解释器加载测试的帧,并返回产生的值。再次调用next()将导致该帧再次加载到解释器堆栈中,并继续产生另一个值。

在第三次调用next()时,生成器已经完成,并抛出StopIteration。

与发电机通信

生成器的一个鲜为人知的特性是,您可以使用两个方法与它们通信:send()和throw()。

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

在调用gen.send()时,该值作为yield关键字的返回值传递。

另一方面,gen.throw()允许在生成器内部抛出异常,在yield被调用的同一点引发异常。

从生成器返回值

从生成器返回一个值,导致该值被放入StopIteration异常中。我们可以稍后从异常中恢复值,并将其用于我们的需要。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键词:屈服

Python 3.4添加了一个新的关键字:yield from。该关键字允许我们做的是将任何next(), send()和throw()传递到最内部嵌套的生成器中。如果内部生成器返回一个值,它也是yield from的返回值:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我已经写了一篇文章来进一步阐述这个主题。

把它们放在一起

在Python 3.4中引入新的关键字yield后,我们现在能够在生成器中创建生成器,就像隧道一样,将数据从最内部的生成器来回传递到最外部的生成器。这为生成器带来了一个新的含义——协程。

协程是可以在运行时停止和恢复的函数。在Python中,它们是使用async def关键字定义的。就像发电机一样,它们也使用自己的等待产量形式。在Python 3.5引入async和await之前,我们以与生成器创建完全相同的方式创建协程(使用yield from而不是await)。

async def inner():
    return 1

async def outer():
    await inner()

就像所有迭代器和生成器实现__iter__()方法一样,所有协程都实现__await__(),这允许它们在每次await coro被调用时继续执行。

在Python文档中有一个很好的序列图,你应该去看看。

在asyncio中,除了协程函数,我们还有两个重要的对象:任务和未来。

期货

期货是实现了__await__()方法的对象,它们的工作是保存特定的状态和结果。状态可以是以下状态之一:

PENDING - future没有任何结果或异常集。 CANCELLED -使用fut.cancel()取消未来 FINISHED - future被结束,通过使用fut.set_result()的结果集或使用fut.set_exception()的异常集完成。

正如您所猜测的那样,结果可以是一个将返回的Python对象,也可以是一个可能引发的异常。

未来对象的另一个重要特性是它们包含一个名为add_done_callback()的方法。此方法允许在任务完成时立即调用函数——无论它引发异常还是完成。

任务

任务对象是特殊的期货,它围绕着协程,并与最内部和最外部的协程通信。每次协程等待future时,future就会一直传递给任务(就像yield from一样),然后任务接收它。

接下来,任务将自己绑定到未来。它通过在将来调用add_done_callback()来做到这一点。从现在开始,如果将来要完成,无论是取消,传递异常,还是传递一个Python对象,任务的回调将被调用,它将上升到存在。

Asyncio

我们必须回答的最后一个紧迫问题是——IO是如何实现的?

在asyncio的深处,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次任务准备就绪时调用它们,并将所有工作协调到一个工作机器中。

事件循环的IO部分构建在一个称为select的关键函数之上。Select是一个阻塞函数,由下面的操作系统实现,允许在套接字上等待传入或传出数据。在接收到数据时,它将被唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果它的.send()缓冲区已满,或者.recv()缓冲区为空,套接字就会注册到select函数(只需将其添加到其中一个列表中,rlist用于recv, wlist用于send),相应的函数将等待一个新创建的future对象,该对象与该套接字绑定。

当所有可用任务都在等待未来时,事件循环调用select并等待。当其中一个套接字有传入数据,或者它的发送缓冲区耗尽时,asyncio检查绑定到该套接字的未来对象,并将其设置为done。

现在奇迹发生了。未来被设置为完成,之前使用add_done_callback()添加自己的任务将复活,并在协程上调用.send(),该协程将恢复最内部的协程(因为等待链),您将从它溢出到的附近缓冲区读取新接收的数据。

在recv()的情况下,再次使用方法链:

选择。选择等待。 返回一个就绪的套接字,其中包含数据。 来自套接字的数据被移动到缓冲区中。 调用Future.set_result()。 使用add_done_callback()添加自己的任务现在被唤醒。 Task在协程上调用.send(),它会一直进入最内部的协程并唤醒它。 数据从缓冲区读取并返回给我们的普通用户。

总之,asyncio使用生成器功能,允许暂停和恢复函数。它使用yield from功能,允许将数据从最内部的生成器来回传递到最外部的生成器。它使用所有这些方法是为了在等待IO完成时暂停函数执行(通过使用OS选择函数)。

最好的是什么?当一个函数暂停时,另一个函数可能会运行并与精致的结构交织,这是asyncio的。

If you picture an airport control tower, with many planes waiting to land on the same runway. The control tower can be seen as the event loop and runway as the thread. Each plane is a separate function waiting to execute. In reality only one plane can land on the runway at a time. What asyncio basically does it allows many planes to land simultaneously on the same runway by using the event loop to suspend functions and allow other functions to run when you use the await syntax it basically means that plane(function can be suspended and allow other functions to process