这个问题是由我的另一个问题:如何在cdef等待?

网络上有大量关于asyncio的文章和博客文章,但它们都非常肤浅。我找不到任何关于asyncio实际是如何实现的,以及什么使I/O异步的信息。我试图阅读源代码,但它有数千行不是最高级的C代码,其中很多处理辅助对象,但最重要的是,它很难将Python语法和它将转换成的C代码联系起来。

Asycnio自己的文档就更没有帮助了。这里没有关于它如何工作的信息,只有一些关于如何使用它的指南,这些指南有时也会误导/写得很糟糕。

我熟悉Go的协程实现,并希望Python也能做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码应该是有效的。既然它没有,我现在正试图找出原因。到目前为止,我最好的猜测如下,请纠正我的错误:

Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?). The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

换句话说,这是我试图将一些asyncio语法“糖化”成更容易理解的东西:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?


当前回答

谈论async/await和asyncio不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有唯一的最终答案。

下面是async/await和asyncio-like库如何工作的一般描述。也就是说,上面可能还有其他技巧(有……),但它们都是无关紧要的,除非你自己创建它们。这种差异应该可以忽略不计,除非你已经知道得足够多,不必问这样的问题。

1. 简而言之,协程与子例程

就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有一个执行代码段的堆栈,每个代码段都位于特定的指令上。

def和async def的区别仅仅是为了清晰。实际的差别是回报和收益率。因此,从单个调用到整个堆栈的差异中await或yield。

1.1. 子例程

子例程表示一个新的堆栈级别,以保存局部变量,并对其指令进行一次遍历以达到结束。考虑这样的子程序:

def subfoo(bar):
     qux = 3
     return qux * bar

当你运行它时,这意味着

为bar和qux分配堆栈空间 递归地执行第一个语句并跳转到下一个语句 每次返回时,将其值推入调用堆栈 清除堆栈(1.)和指令指针(2.)

值得注意的是,4。意味着子程序总是以相同的状态开始。函数本身专有的所有内容在完成时都将丢失。一个函数不能被恢复,即使在返回后有指令。

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. 协程作为持久子例程

协程类似于子例程,但可以在不破坏其状态的情况下退出。考虑这样一个协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

当你运行它时,这意味着

为bar和qux分配堆栈空间 递归地执行第一个语句并跳转到下一个语句 一旦达到yield,将其值推入调用堆栈,但存储堆栈和指令指针 一旦调用yield,恢复堆栈和指令指针,并将参数推入qux 每次返回时,将其值推入调用堆栈 清除堆栈(1.)和指令指针(2.)

注意2.1和2.2的添加—协程可以在预定义的点挂起和恢复。这与子例程在调用另一个子例程时挂起类似。不同之处在于活动协程没有严格绑定到它的调用堆栈。相反,挂起的协程是一个单独的、孤立的堆栈的一部分。

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

这意味着挂起的协程可以在堆栈之间自由存储或移动。任何访问协程的调用栈都可以决定恢复它。

1.3. 遍历调用堆栈

到目前为止,我们的协程只在yield下到调用堆栈。子例程可以使用return和()在调用堆栈中上下移动。为了完整起见,协程还需要一种机制来上升到调用堆栈。考虑这样一个协程:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,这仍然像存储一个子程序。

然而,yield from两者都有。它挂起堆栈和wrap的指令指针,并运行cofoo。注意,在cofoo完全完成之前,wrap一直处于暂停状态。每当cofoo挂起或发送一些东西时,cofoo直接连接到调用堆栈。

1.4. 协程一直到下面

在建立时,yield from允许跨另一个中间作用域连接两个作用域。当递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

注意,root和coro_b彼此不知道。这使得协程比回调简洁得多:协程仍然像子例程一样建立在1:1的关系上。协程挂起并恢复其整个现有的执行堆栈,直到常规调用点。

值得注意的是,root可以恢复任意数量的协程。然而,它永远不能同时恢复多个。相同根的协程是并发的,但不是并行的!

1.5. Python的async和await

到目前为止,解释已经明确地使用了生成器的yield和yield from词汇表-底层功能是相同的。新的Python3.5语法async和await的存在主要是为了清晰。

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async for和async with语句是必需的,因为你会用裸for和with语句打破yield from/await链。

2. 一个简单事件循环的剖析

就其本身而言,协程没有将控制权交给另一个协程的概念。它只能在协程堆栈的底部将控制权交给调用方。然后,调用者可以切换到另一个协程并运行它。

几个协程的根节点通常是一个事件循环:在挂起时,协程产生一个它想要恢复的事件。反过来,事件循环能够有效地等待这些事件发生。这允许它决定接下来运行哪个协程,或者在恢复之前如何等待。

这样的设计意味着循环可以理解一组预定义的事件。几个协程相互等待,直到最后等待一个事件。此事件可以通过交出控制权直接与事件循环通信。

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

关键是协程挂起允许事件循环和事件直接通信。中间协程栈不需要知道哪个循环在运行它,也不需要知道事件是如何工作的。

2.1.1. 时间事件

最简单的事件是到达某个时间点。这也是线程代码的一个基本块:线程重复地休眠,直到条件为真。 然而,常规睡眠本身会阻塞执行——我们希望其他协程不被阻塞。相反,我们希望告诉事件循环什么时候应该恢复当前协程堆栈。

2.1.2. 定义事件

事件只是一个我们可以识别的值——通过枚举、类型或其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以允许直接等待类。

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

这个类只存储事件——它没有说明如何实际处理它。

唯一的特殊功能是__await__ -它是await关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1. 等待事件

现在我们有了一个事件,协程如何对它做出反应?我们应该能够通过等待事件来表达睡眠的等效内容。为了更好地了解发生了什么,有一半的时间我们会等待两次:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行这个协程。类似于生成器,使用协程。Send运行协程,直到产生结果。

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

这为我们提供了两个AsyncSleep事件,然后在协程完成时提供了一个StopIteration。请注意,唯一的延迟来自时间。在循环中睡觉!每个AsyncSleep只存储当前时间的偏移量。

2.2.2. 事件+睡眠

在这一点上,我们有两个独立的机制可供我们使用:

可从协程内部产生的AsyncSleep事件 时间。可以等待而不影响协程的睡眠

值得注意的是,这两者是正交的:一个不会影响或触发另一个。因此,我们可以提出自己的睡眠策略来满足AsyncSleep的延迟。

2.3. 一个简单的事件循环

如果我们有几个协程,每个协程都可以告诉我们它什么时候想要被唤醒。然后,我们可以等待其中的第一个,然后等待下一个,依此类推。值得注意的是,在每个点上,我们只关心下一个是哪个。

这就形成了一个简单的调度:

按所需的唤醒时间对协程进行排序 选第一个想醒来的 等到这个时间点 运行这个协程 从1开始重复。

简单的实现不需要任何高级概念。列表允许按日期对协程进行排序。等待是固定的时间。运行协程就像之前使用corroutine .send一样。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

当然,这方面还有很大的改进空间。我们可以为等待队列使用堆,或为事件使用分派表。我们还可以从StopIteration中获取返回值并将它们分配给协程。然而,基本原则是不变的。

2.4. 合作等

AsyncSleep事件和run事件循环是计时事件的完全工作实现。

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

这会在五个协程之间进行合作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然在0.5秒而不是2.5秒内执行工作。每个协程保存状态并独立行动。

3.I/O事件循环

支持睡眠的事件循环适用于轮询。然而,在文件句柄上等待I/O可以更有效地完成:操作系统实现I/O,因此知道哪些句柄已经准备好了。理想情况下,事件循环应该支持显式的“ready for I/O”事件。

3.1. select调用

Python已经有一个接口来查询操作系统的读I/O句柄。当使用句柄来读取或写入时,它返回准备读取或写入的句柄:

readable, writable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以打开一个文件并等待它准备好:

write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])

一旦select返回,writable包含我们打开的文件。

3.2. 基本I/O事件

类似于AsyncSleep请求,我们需要为I/O定义一个事件。使用底层的选择逻辑,事件必须引用一个可读的对象——比如一个打开的文件。此外,我们还存储要读取的数据量。

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = b'' if 'b' in file.mode else ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

与AsyncSleep一样,我们主要只存储底层系统调用所需的数据。这一次,__await__可以被多次恢复,直到读取所需的量为止。此外,我们返回I/O结果,而不是仅仅恢复。

3.3. 用读I/O增加事件循环

事件循环的基础仍然是前面定义的运行。首先,我们需要跟踪读请求。这不再是一个排序的调度,我们只将读请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

因为选择。Select有一个timeout参数,我们可以用它来代替time.sleep。

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])

这为我们提供了所有可读文件——如果有,我们运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

最后,我们必须实际监听读请求。

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. 把它们放在一起

上面的描述有点简单化。我们需要做一些切换,如果我们总能读取,就不会饿死睡眠协程。我们需要面对无书可读、无事可等的现实。但是,最终结果仍然适合30 LOC。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. 合作的I / O

AsyncSleep, AsyncRead和run实现现在完全可以用于睡眠和/或读取。 和困倦一样,我们可以定义一个帮助来测试阅读:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

运行这个,我们可以看到我们的I/O与等待任务交织在一起:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. 非阻塞I / O

虽然文件上的I/O理解了这个概念,但它并不真正适合asyncio这样的库:select调用总是返回文件,并且open和read都可能无限期地阻塞。这会阻塞事件循环的所有协程——这很糟糕。aiofiles等库使用线程和同步来伪造文件上的非阻塞I/O和事件。

然而,套接字确实允许非阻塞I/O,它们固有的延迟使它变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不阻塞任何东西。

4.1. 非阻塞I/O事件

类似于我们的AsyncRead,我们可以为套接字定义一个挂起并读取事件。我们不取文件,而是取套接字——它必须是非阻塞的。而且,我们的__await__使用套接字。Recv而不是file.read。

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

与AsyncRead相比,__await__执行真正的非阻塞I/O。当数据可用时,它总是读取数据。当没有可用数据时,它总是挂起。这意味着事件循环只在我们执行有用的工作时被阻塞。

4.2. 解除事件循环阻塞

就事件循环而言,没有什么变化。要监听的事件仍然与文件事件相同——一个由select标记为ready的文件描述符。

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

此时,应该很明显AsyncRead和AsyncRecv是同一种事件。我们可以很容易地将它们重构为一个具有可交换I/O组件的事件。实际上,事件循环、协程和事件将调度程序、任意中间代码和实际I/O清晰地分开。

4.3. 非阻塞I/O的丑陋一面

原则上,此时您应该做的是将read的逻辑复制为AsyncRecv的recv。然而,现在这要丑陋得多——当函数在内核内阻塞时,你必须处理早期返回,但控制权交给你。例如,打开一个连接比打开一个文件要长得多:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经工作了。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

齿顶高

github上的示例代码

其他回答

这一切都归结为asyncio正在解决的两个主要挑战:

如何在一个线程中执行多个I/O ? 如何实现协同多任务处理?

第一点的答案已经存在很长一段时间了,被称为选择循环。在python中,它在selectors模块中实现。

第二个问题与协程的概念有关,即可以停止执行并稍后恢复的函数。在python中,协程是使用生成器和yield from语句实现的。这就是隐藏在async/await语法背后的东西。

在这个答案中有更多的资源。


编辑:解决您对goroutines的评论:

在asyncio中最接近于goroutine的实际上不是协程,而是任务(请参阅文档中的区别)。在python中,协程(或生成器)不知道事件循环或I/O的概念。它只是一个函数,可以使用yield停止执行,同时保持当前状态,以便稍后恢复。语法的yield允许以透明的方式将它们链接起来。

现在,在一个asyncio任务中,链的最底部的协程总是最终产生一个future。然后,这个future出现在事件循环中,并集成到内部机制中。当future被其他内部回调设置为done时,事件循环可以通过将future发送回协程链来恢复任务。


编辑:解决你帖子中的一些问题:

在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个解释器挂起,I/O发生在解释器之外吗?

不,线程中没有发生任何事情。I/O总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,这就为您带来了麻烦。

I/O到底是什么意思?如果我的python过程调用C open()过程,它反过来将中断发送给内核,放弃对它的控制,python解释器如何知道这一点,并能够继续运行一些其他代码,而内核代码进行实际的I/O,直到它唤醒最初发送中断的python过程?原则上,Python解释器如何意识到这种情况?

I/O是任何阻塞调用。在asyncio中,所有的I/O操作都应该经过事件循环,因为如您所说,事件循环无法意识到某些同步代码中正在执行阻塞调用。这意味着您不应该在协程的上下文中使用同步打开。相反,使用专用的库,如aiofiles,它提供了open的异步版本。

asyncio是如何工作的?

在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,就跳过这些。

发电机

生成器是允许我们暂停python函数执行的对象。用户管理生成器使用关键字yield实现。通过创建一个包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,在生成器上调用next()会导致解释器加载测试的帧,并返回产生的值。再次调用next()将导致该帧再次加载到解释器堆栈中,并继续产生另一个值。

在第三次调用next()时,生成器已经完成,并抛出StopIteration。

与发电机通信

生成器的一个鲜为人知的特性是,您可以使用两个方法与它们通信:send()和throw()。

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

在调用gen.send()时,该值作为yield关键字的返回值传递。

另一方面,gen.throw()允许在生成器内部抛出异常,在yield被调用的同一点引发异常。

从生成器返回值

从生成器返回一个值,导致该值被放入StopIteration异常中。我们可以稍后从异常中恢复值,并将其用于我们的需要。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键词:屈服

Python 3.4添加了一个新的关键字:yield from。该关键字允许我们做的是将任何next(), send()和throw()传递到最内部嵌套的生成器中。如果内部生成器返回一个值,它也是yield from的返回值:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我已经写了一篇文章来进一步阐述这个主题。

把它们放在一起

在Python 3.4中引入新的关键字yield后,我们现在能够在生成器中创建生成器,就像隧道一样,将数据从最内部的生成器来回传递到最外部的生成器。这为生成器带来了一个新的含义——协程。

协程是可以在运行时停止和恢复的函数。在Python中,它们是使用async def关键字定义的。就像发电机一样,它们也使用自己的等待产量形式。在Python 3.5引入async和await之前,我们以与生成器创建完全相同的方式创建协程(使用yield from而不是await)。

async def inner():
    return 1

async def outer():
    await inner()

就像所有迭代器和生成器实现__iter__()方法一样,所有协程都实现__await__(),这允许它们在每次await coro被调用时继续执行。

在Python文档中有一个很好的序列图,你应该去看看。

在asyncio中,除了协程函数,我们还有两个重要的对象:任务和未来。

期货

期货是实现了__await__()方法的对象,它们的工作是保存特定的状态和结果。状态可以是以下状态之一:

PENDING - future没有任何结果或异常集。 CANCELLED -使用fut.cancel()取消未来 FINISHED - future被结束,通过使用fut.set_result()的结果集或使用fut.set_exception()的异常集完成。

正如您所猜测的那样,结果可以是一个将返回的Python对象,也可以是一个可能引发的异常。

未来对象的另一个重要特性是它们包含一个名为add_done_callback()的方法。此方法允许在任务完成时立即调用函数——无论它引发异常还是完成。

任务

任务对象是特殊的期货,它围绕着协程,并与最内部和最外部的协程通信。每次协程等待future时,future就会一直传递给任务(就像yield from一样),然后任务接收它。

接下来,任务将自己绑定到未来。它通过在将来调用add_done_callback()来做到这一点。从现在开始,如果将来要完成,无论是取消,传递异常,还是传递一个Python对象,任务的回调将被调用,它将上升到存在。

Asyncio

我们必须回答的最后一个紧迫问题是——IO是如何实现的?

在asyncio的深处,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次任务准备就绪时调用它们,并将所有工作协调到一个工作机器中。

事件循环的IO部分构建在一个称为select的关键函数之上。Select是一个阻塞函数,由下面的操作系统实现,允许在套接字上等待传入或传出数据。在接收到数据时,它将被唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果它的.send()缓冲区已满,或者.recv()缓冲区为空,套接字就会注册到select函数(只需将其添加到其中一个列表中,rlist用于recv, wlist用于send),相应的函数将等待一个新创建的future对象,该对象与该套接字绑定。

当所有可用任务都在等待未来时,事件循环调用select并等待。当其中一个套接字有传入数据,或者它的发送缓冲区耗尽时,asyncio检查绑定到该套接字的未来对象,并将其设置为done。

现在奇迹发生了。未来被设置为完成,之前使用add_done_callback()添加自己的任务将复活,并在协程上调用.send(),该协程将恢复最内部的协程(因为等待链),您将从它溢出到的附近缓冲区读取新接收的数据。

在recv()的情况下,再次使用方法链:

选择。选择等待。 返回一个就绪的套接字,其中包含数据。 来自套接字的数据被移动到缓冲区中。 调用Future.set_result()。 使用add_done_callback()添加自己的任务现在被唤醒。 Task在协程上调用.send(),它会一直进入最内部的协程并唤醒它。 数据从缓冲区读取并返回给我们的普通用户。

总之,asyncio使用生成器功能,允许暂停和恢复函数。它使用yield from功能,允许将数据从最内部的生成器来回传递到最外部的生成器。它使用所有这些方法是为了在等待IO完成时暂停函数执行(通过使用OS选择函数)。

最好的是什么?当一个函数暂停时,另一个函数可能会运行并与精致的结构交织,这是asyncio的。

你的coro糖化是正确的概念,但略不完整。

Await不会无条件挂起,只在遇到阻塞调用时挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在实际的asyncio中,等效代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应类似生成器的对象时,可以等待上面的代码。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人倾向于描述上述方面的产量,从其中自动暂停。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,则循环在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

默认情况下,所有事情都发生在同一个线程中。 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协同程序驱动事件循环,我推荐Dave Beazley的演讲,他在现场观众面前演示了从头开始编写事件循环。

谈论async/await和asyncio不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有唯一的最终答案。

下面是async/await和asyncio-like库如何工作的一般描述。也就是说,上面可能还有其他技巧(有……),但它们都是无关紧要的,除非你自己创建它们。这种差异应该可以忽略不计,除非你已经知道得足够多,不必问这样的问题。

1. 简而言之,协程与子例程

就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有一个执行代码段的堆栈,每个代码段都位于特定的指令上。

def和async def的区别仅仅是为了清晰。实际的差别是回报和收益率。因此,从单个调用到整个堆栈的差异中await或yield。

1.1. 子例程

子例程表示一个新的堆栈级别,以保存局部变量,并对其指令进行一次遍历以达到结束。考虑这样的子程序:

def subfoo(bar):
     qux = 3
     return qux * bar

当你运行它时,这意味着

为bar和qux分配堆栈空间 递归地执行第一个语句并跳转到下一个语句 每次返回时,将其值推入调用堆栈 清除堆栈(1.)和指令指针(2.)

值得注意的是,4。意味着子程序总是以相同的状态开始。函数本身专有的所有内容在完成时都将丢失。一个函数不能被恢复,即使在返回后有指令。

root -\
  :    \- subfoo --\
  :/--<---return --/
  |
  V

1.2. 协程作为持久子例程

协程类似于子例程,但可以在不破坏其状态的情况下退出。考虑这样一个协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

当你运行它时,这意味着

为bar和qux分配堆栈空间 递归地执行第一个语句并跳转到下一个语句 一旦达到yield,将其值推入调用堆栈,但存储堆栈和指令指针 一旦调用yield,恢复堆栈和指令指针,并将参数推入qux 每次返回时,将其值推入调用堆栈 清除堆栈(1.)和指令指针(2.)

注意2.1和2.2的添加—协程可以在预定义的点挂起和恢复。这与子例程在调用另一个子例程时挂起类似。不同之处在于活动协程没有严格绑定到它的调用堆栈。相反,挂起的协程是一个单独的、孤立的堆栈的一部分。

root -\
  :    \- cofoo --\
  :/--<+--yield --/
  |    :
  V    :

这意味着挂起的协程可以在堆栈之间自由存储或移动。任何访问协程的调用栈都可以决定恢复它。

1.3. 遍历调用堆栈

到目前为止,我们的协程只在yield下到调用堆栈。子例程可以使用return和()在调用堆栈中上下移动。为了完整起见,协程还需要一种机制来上升到调用堆栈。考虑这样一个协程:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,这仍然像存储一个子程序。

然而,yield from两者都有。它挂起堆栈和wrap的指令指针,并运行cofoo。注意,在cofoo完全完成之前,wrap一直处于暂停状态。每当cofoo挂起或发送一些东西时,cofoo直接连接到调用堆栈。

1.4. 协程一直到下面

在建立时,yield from允许跨另一个中间作用域连接两个作用域。当递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。

root -\
  :    \-> coro_a -yield-from-> coro_b --\
  :/ <-+------------------------yield ---/
  |    :
  :\ --+-- coro_a.send----------yield ---\
  :                             coro_b <-/

注意,root和coro_b彼此不知道。这使得协程比回调简洁得多:协程仍然像子例程一样建立在1:1的关系上。协程挂起并恢复其整个现有的执行堆栈,直到常规调用点。

值得注意的是,root可以恢复任意数量的协程。然而,它永远不能同时恢复多个。相同根的协程是并发的,但不是并行的!

1.5. Python的async和await

到目前为止,解释已经明确地使用了生成器的yield和yield from词汇表-底层功能是相同的。新的Python3.5语法async和await的存在主要是为了清晰。

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async for和async with语句是必需的,因为你会用裸for和with语句打破yield from/await链。

2. 一个简单事件循环的剖析

就其本身而言,协程没有将控制权交给另一个协程的概念。它只能在协程堆栈的底部将控制权交给调用方。然后,调用者可以切换到另一个协程并运行它。

几个协程的根节点通常是一个事件循环:在挂起时,协程产生一个它想要恢复的事件。反过来,事件循环能够有效地等待这些事件发生。这允许它决定接下来运行哪个协程,或者在恢复之前如何等待。

这样的设计意味着循环可以理解一组预定义的事件。几个协程相互等待,直到最后等待一个事件。此事件可以通过交出控制权直接与事件循环通信。

loop -\
  :    \-> coroutine --await--> event --\
  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  :\ --+-- send(reply) -------- yield --\
  :        coroutine <--yield-- event <-/

关键是协程挂起允许事件循环和事件直接通信。中间协程栈不需要知道哪个循环在运行它,也不需要知道事件是如何工作的。

2.1.1. 时间事件

最简单的事件是到达某个时间点。这也是线程代码的一个基本块:线程重复地休眠,直到条件为真。 然而,常规睡眠本身会阻塞执行——我们希望其他协程不被阻塞。相反,我们希望告诉事件循环什么时候应该恢复当前协程堆栈。

2.1.2. 定义事件

事件只是一个我们可以识别的值——通过枚举、类型或其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以允许直接等待类。

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

这个类只存储事件——它没有说明如何实际处理它。

唯一的特殊功能是__await__ -它是await关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1. 等待事件

现在我们有了一个事件,协程如何对它做出反应?我们应该能够通过等待事件来表达睡眠的等效内容。为了更好地了解发生了什么,有一半的时间我们会等待两次:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行这个协程。类似于生成器,使用协程。Send运行协程,直到产生结果。

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

这为我们提供了两个AsyncSleep事件,然后在协程完成时提供了一个StopIteration。请注意,唯一的延迟来自时间。在循环中睡觉!每个AsyncSleep只存储当前时间的偏移量。

2.2.2. 事件+睡眠

在这一点上,我们有两个独立的机制可供我们使用:

可从协程内部产生的AsyncSleep事件 时间。可以等待而不影响协程的睡眠

值得注意的是,这两者是正交的:一个不会影响或触发另一个。因此,我们可以提出自己的睡眠策略来满足AsyncSleep的延迟。

2.3. 一个简单的事件循环

如果我们有几个协程,每个协程都可以告诉我们它什么时候想要被唤醒。然后,我们可以等待其中的第一个,然后等待下一个,依此类推。值得注意的是,在每个点上,我们只关心下一个是哪个。

这就形成了一个简单的调度:

按所需的唤醒时间对协程进行排序 选第一个想醒来的 等到这个时间点 运行这个协程 从1开始重复。

简单的实现不需要任何高级概念。列表允许按日期对协程进行排序。等待是固定的时间。运行协程就像之前使用corroutine .send一样。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

当然,这方面还有很大的改进空间。我们可以为等待队列使用堆,或为事件使用分派表。我们还可以从StopIteration中获取返回值并将它们分配给协程。然而,基本原则是不变的。

2.4. 合作等

AsyncSleep事件和run事件循环是计时事件的完全工作实现。

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

这会在五个协程之间进行合作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然在0.5秒而不是2.5秒内执行工作。每个协程保存状态并独立行动。

3.I/O事件循环

支持睡眠的事件循环适用于轮询。然而,在文件句柄上等待I/O可以更有效地完成:操作系统实现I/O,因此知道哪些句柄已经准备好了。理想情况下,事件循环应该支持显式的“ready for I/O”事件。

3.1. select调用

Python已经有一个接口来查询操作系统的读I/O句柄。当使用句柄来读取或写入时,它返回准备读取或写入的句柄:

readable, writable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以打开一个文件并等待它准备好:

write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])

一旦select返回,writable包含我们打开的文件。

3.2. 基本I/O事件

类似于AsyncSleep请求,我们需要为I/O定义一个事件。使用底层的选择逻辑,事件必须引用一个可读的对象——比如一个打开的文件。此外,我们还存储要读取的数据量。

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = b'' if 'b' in file.mode else ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

与AsyncSleep一样,我们主要只存储底层系统调用所需的数据。这一次,__await__可以被多次恢复,直到读取所需的量为止。此外,我们返回I/O结果,而不是仅仅恢复。

3.3. 用读I/O增加事件循环

事件循环的基础仍然是前面定义的运行。首先,我们需要跟踪读请求。这不再是一个排序的调度,我们只将读请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

因为选择。Select有一个timeout参数,我们可以用它来代替time.sleep。

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])

这为我们提供了所有可读文件——如果有,我们运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

最后,我们必须实际监听读请求。

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. 把它们放在一起

上面的描述有点简单化。我们需要做一些切换,如果我们总能读取,就不会饿死睡眠协程。我们需要面对无书可读、无事可等的现实。但是,最终结果仍然适合30 LOC。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. 合作的I / O

AsyncSleep, AsyncRead和run实现现在完全可以用于睡眠和/或读取。 和困倦一样,我们可以定义一个帮助来测试阅读:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

运行这个,我们可以看到我们的I/O与等待任务交织在一起:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. 非阻塞I / O

虽然文件上的I/O理解了这个概念,但它并不真正适合asyncio这样的库:select调用总是返回文件,并且open和read都可能无限期地阻塞。这会阻塞事件循环的所有协程——这很糟糕。aiofiles等库使用线程和同步来伪造文件上的非阻塞I/O和事件。

然而,套接字确实允许非阻塞I/O,它们固有的延迟使它变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不阻塞任何东西。

4.1. 非阻塞I/O事件

类似于我们的AsyncRead,我们可以为套接字定义一个挂起并读取事件。我们不取文件,而是取套接字——它必须是非阻塞的。而且,我们的__await__使用套接字。Recv而不是file.read。

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

与AsyncRead相比,__await__执行真正的非阻塞I/O。当数据可用时,它总是读取数据。当没有可用数据时,它总是挂起。这意味着事件循环只在我们执行有用的工作时被阻塞。

4.2. 解除事件循环阻塞

就事件循环而言,没有什么变化。要监听的事件仍然与文件事件相同——一个由select标记为ready的文件描述符。

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

此时,应该很明显AsyncRead和AsyncRecv是同一种事件。我们可以很容易地将它们重构为一个具有可交换I/O组件的事件。实际上,事件循环、协程和事件将调度程序、任意中间代码和实际I/O清晰地分开。

4.3. 非阻塞I/O的丑陋一面

原则上,此时您应该做的是将read的逻辑复制为AsyncRecv的recv。然而,现在这要丑陋得多——当函数在内核内阻塞时,你必须处理早期返回,但控制权交给你。例如,打开一个连接比打开一个文件要长得多:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经工作了。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

齿顶高

github上的示例代码

If you picture an airport control tower, with many planes waiting to land on the same runway. The control tower can be seen as the event loop and runway as the thread. Each plane is a separate function waiting to execute. In reality only one plane can land on the runway at a time. What asyncio basically does it allows many planes to land simultaneously on the same runway by using the event loop to suspend functions and allow other functions to run when you use the await syntax it basically means that plane(function can be suspended and allow other functions to process