谈论async/await和asyncio不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有唯一的最终答案。
下面是async/await和asyncio-like库如何工作的一般描述。也就是说,上面可能还有其他技巧(有……),但它们都是无关紧要的,除非你自己创建它们。这种差异应该可以忽略不计,除非你已经知道得足够多,不必问这样的问题。
1. 简而言之,协程与子例程
就像子例程(函数,过程,…)一样,协程(生成器,…)是调用堆栈和指令指针的抽象:有一个执行代码段的堆栈,每个代码段都位于特定的指令上。
def和async def的区别仅仅是为了清晰。实际的差别是回报和收益率。因此,从单个调用到整个堆栈的差异中await或yield。
1.1. 子例程
子例程表示一个新的堆栈级别,以保存局部变量,并对其指令进行一次遍历以达到结束。考虑这样的子程序:
def subfoo(bar):
qux = 3
return qux * bar
当你运行它时,这意味着
为bar和qux分配堆栈空间
递归地执行第一个语句并跳转到下一个语句
每次返回时,将其值推入调用堆栈
清除堆栈(1.)和指令指针(2.)
值得注意的是,4。意味着子程序总是以相同的状态开始。函数本身专有的所有内容在完成时都将丢失。一个函数不能被恢复,即使在返回后有指令。
root -\
: \- subfoo --\
:/--<---return --/
|
V
1.2. 协程作为持久子例程
协程类似于子例程,但可以在不破坏其状态的情况下退出。考虑这样一个协程:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
当你运行它时,这意味着
为bar和qux分配堆栈空间
递归地执行第一个语句并跳转到下一个语句
一旦达到yield,将其值推入调用堆栈,但存储堆栈和指令指针
一旦调用yield,恢复堆栈和指令指针,并将参数推入qux
每次返回时,将其值推入调用堆栈
清除堆栈(1.)和指令指针(2.)
注意2.1和2.2的添加—协程可以在预定义的点挂起和恢复。这与子例程在调用另一个子例程时挂起类似。不同之处在于活动协程没有严格绑定到它的调用堆栈。相反,挂起的协程是一个单独的、孤立的堆栈的一部分。
root -\
: \- cofoo --\
:/--<+--yield --/
| :
V :
这意味着挂起的协程可以在堆栈之间自由存储或移动。任何访问协程的调用栈都可以决定恢复它。
1.3. 遍历调用堆栈
到目前为止,我们的协程只在yield下到调用堆栈。子例程可以使用return和()在调用堆栈中上下移动。为了完整起见,协程还需要一种机制来上升到调用堆栈。考虑这样一个协程:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,这仍然像存储一个子程序。
然而,yield from两者都有。它挂起堆栈和wrap的指令指针,并运行cofoo。注意,在cofoo完全完成之前,wrap一直处于暂停状态。每当cofoo挂起或发送一些东西时,cofoo直接连接到调用堆栈。
1.4. 协程一直到下面
在建立时,yield from允许跨另一个中间作用域连接两个作用域。当递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。
root -\
: \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
| :
:\ --+-- coro_a.send----------yield ---\
: coro_b <-/
注意,root和coro_b彼此不知道。这使得协程比回调简洁得多:协程仍然像子例程一样建立在1:1的关系上。协程挂起并恢复其整个现有的执行堆栈,直到常规调用点。
值得注意的是,root可以恢复任意数量的协程。然而,它永远不能同时恢复多个。相同根的协程是并发的,但不是并行的!
1.5. Python的async和await
到目前为止,解释已经明确地使用了生成器的yield和yield from词汇表-底层功能是相同的。新的Python3.5语法async和await的存在主要是为了清晰。
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
async for和async with语句是必需的,因为你会用裸for和with语句打破yield from/await链。
2. 一个简单事件循环的剖析
就其本身而言,协程没有将控制权交给另一个协程的概念。它只能在协程堆栈的底部将控制权交给调用方。然后,调用者可以切换到另一个协程并运行它。
几个协程的根节点通常是一个事件循环:在挂起时,协程产生一个它想要恢复的事件。反过来,事件循环能够有效地等待这些事件发生。这允许它决定接下来运行哪个协程,或者在恢复之前如何等待。
这样的设计意味着循环可以理解一组预定义的事件。几个协程相互等待,直到最后等待一个事件。此事件可以通过交出控制权直接与事件循环通信。
loop -\
: \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
:\ --+-- send(reply) -------- yield --\
: coroutine <--yield-- event <-/
关键是协程挂起允许事件循环和事件直接通信。中间协程栈不需要知道哪个循环在运行它,也不需要知道事件是如何工作的。
2.1.1. 时间事件
最简单的事件是到达某个时间点。这也是线程代码的一个基本块:线程重复地休眠,直到条件为真。
然而,常规睡眠本身会阻塞执行——我们希望其他协程不被阻塞。相反,我们希望告诉事件循环什么时候应该恢复当前协程堆栈。
2.1.2. 定义事件
事件只是一个我们可以识别的值——通过枚举、类型或其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以允许直接等待类。
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
这个类只存储事件——它没有说明如何实际处理它。
唯一的特殊功能是__await__ -它是await关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。
2.2.1. 等待事件
现在我们有了一个事件,协程如何对它做出反应?我们应该能够通过等待事件来表达睡眠的等效内容。为了更好地了解发生了什么,有一半的时间我们会等待两次:
import time
async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
我们可以直接实例化并运行这个协程。类似于生成器,使用协程。Send运行协程,直到产生结果。
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
这为我们提供了两个AsyncSleep事件,然后在协程完成时提供了一个StopIteration。请注意,唯一的延迟来自时间。在循环中睡觉!每个AsyncSleep只存储当前时间的偏移量。
2.2.2. 事件+睡眠
在这一点上,我们有两个独立的机制可供我们使用:
可从协程内部产生的AsyncSleep事件
时间。可以等待而不影响协程的睡眠
值得注意的是,这两者是正交的:一个不会影响或触发另一个。因此,我们可以提出自己的睡眠策略来满足AsyncSleep的延迟。
2.3. 一个简单的事件循环
如果我们有几个协程,每个协程都可以告诉我们它什么时候想要被唤醒。然后,我们可以等待其中的第一个,然后等待下一个,依此类推。值得注意的是,在每个点上,我们只关心下一个是哪个。
这就形成了一个简单的调度:
按所需的唤醒时间对协程进行排序
选第一个想醒来的
等到这个时间点
运行这个协程
从1开始重复。
简单的实现不需要任何高级概念。列表允许按日期对协程进行排序。等待是固定的时间。运行协程就像之前使用corroutine .send一样。
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
当然,这方面还有很大的改进空间。我们可以为等待队列使用堆,或为事件使用分派表。我们还可以从StopIteration中获取返回值并将它们分配给协程。然而,基本原则是不变的。
2.4. 合作等
AsyncSleep事件和run事件循环是计时事件的完全工作实现。
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
这会在五个协程之间进行合作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然在0.5秒而不是2.5秒内执行工作。每个协程保存状态并独立行动。
3.I/O事件循环
支持睡眠的事件循环适用于轮询。然而,在文件句柄上等待I/O可以更有效地完成:操作系统实现I/O,因此知道哪些句柄已经准备好了。理想情况下,事件循环应该支持显式的“ready for I/O”事件。
3.1. select调用
Python已经有一个接口来查询操作系统的读I/O句柄。当使用句柄来读取或写入时,它返回准备读取或写入的句柄:
readable, writable, _ = select.select(rlist, wlist, xlist, timeout)
例如,我们可以打开一个文件并等待它准备好:
write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])
一旦select返回,writable包含我们打开的文件。
3.2. 基本I/O事件
类似于AsyncSleep请求,我们需要为I/O定义一个事件。使用底层的选择逻辑,事件必须引用一个可读的对象——比如一个打开的文件。此外,我们还存储要读取的数据量。
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = b'' if 'b' in file.mode else ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
与AsyncSleep一样,我们主要只存储底层系统调用所需的数据。这一次,__await__可以被多次恢复,直到读取所需的量为止。此外,我们返回I/O结果,而不是仅仅恢复。
3.3. 用读I/O增加事件循环
事件循环的基础仍然是前面定义的运行。首先,我们需要跟踪读请求。这不再是一个排序的调度,我们只将读请求映射到协程。
# new
waiting_read = {} # type: Dict[file, coroutine]
因为选择。Select有一个timeout参数,我们可以用它来代替time.sleep。
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])
这为我们提供了所有可读文件——如果有,我们运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
最后,我们必须实际监听读请求。
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
3.4. 把它们放在一起
上面的描述有点简单化。我们需要做一些切换,如果我们总能读取,就不会饿死睡眠协程。我们需要面对无书可读、无事可等的现实。但是,最终结果仍然适合30 LOC。
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
3.5. 合作的I / O
AsyncSleep, AsyncRead和run实现现在完全可以用于睡眠和/或读取。
和困倦一样,我们可以定义一个帮助来测试阅读:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
运行这个,我们可以看到我们的I/O与等待任务交织在一起:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
4. 非阻塞I / O
虽然文件上的I/O理解了这个概念,但它并不真正适合asyncio这样的库:select调用总是返回文件,并且open和read都可能无限期地阻塞。这会阻塞事件循环的所有协程——这很糟糕。aiofiles等库使用线程和同步来伪造文件上的非阻塞I/O和事件。
然而,套接字确实允许非阻塞I/O,它们固有的延迟使它变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不阻塞任何东西。
4.1. 非阻塞I/O事件
类似于我们的AsyncRead,我们可以为套接字定义一个挂起并读取事件。我们不取文件,而是取套接字——它必须是非阻塞的。而且,我们的__await__使用套接字。Recv而不是file.read。
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
与AsyncRead相比,__await__执行真正的非阻塞I/O。当数据可用时,它总是读取数据。当没有可用数据时,它总是挂起。这意味着事件循环只在我们执行有用的工作时被阻塞。
4.2. 解除事件循环阻塞
就事件循环而言,没有什么变化。要监听的事件仍然与文件事件相同——一个由select标记为ready的文件描述符。
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
此时,应该很明显AsyncRead和AsyncRecv是同一种事件。我们可以很容易地将它们重构为一个具有可交换I/O组件的事件。实际上,事件循环、协程和事件将调度程序、任意中间代码和实际I/O清晰地分开。
4.3. 非阻塞I/O的丑陋一面
原则上,此时您应该做的是将read的逻辑复制为AsyncRecv的recv。然而,现在这要丑陋得多——当函数在内核内阻塞时,你必须处理早期返回,但控制权交给你。例如,打开一个连接比打开一个文件要长得多:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经工作了。
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
齿顶高
github上的示例代码