什么是asyncio?
Asyncio代表异步输入输出,指的是使用单个线程或事件循环实现高并发的编程范式。
异步编程是一种并行编程,允许工作单元与主应用程序线程分开运行。当工作完成时,它通知主线程工作线程的完成或失败。
让我们看看下图:
让我们用一个例子来理解asyncio:
为了理解asyncio背后的概念,让我们考虑一家只有一个服务员的餐厅。突然,三个顾客,A, B和C出现了。他们三个人从服务员那里拿到菜单后,花了不同的时间来决定吃什么。
假设A需要5分钟,B需要10分钟,C需要1分钟。如果单身服务员先从B开始,在10分钟内为B点餐,然后他为A服务,花5分钟记录他点的菜,最后花1分钟知道C想吃什么。
所以,服务员总共要花10 + 5 + 1 = 16分钟来记下他们点的菜。但是,请注意在这个事件序列中,C在服务员到达他之前等了15分钟,A等了10分钟,B等了0分钟。
现在考虑一下,如果服务员知道每位顾客做出决定所需的时间。他可以先从C开始,然后到A,最后到b。这样每个顾客的等待时间为0分钟。
尽管只有一个服务员,但却产生了三个服务员的错觉,每个顾客都有一个服务员。
最后,服务员完成三份订单所需的总时间为10分钟,远少于另一种情况下的16分钟。
让我们来看另一个例子:
假设,国际象棋大师马格努斯·卡尔森(Magnus Carlsen)主持了一场国际象棋展览,他与多名业余棋手同场竞技。他有两种方式进行展览:同步和异步。
假设:
24的对手
马格努斯·卡尔森在5秒内走完每一步棋
每个对手有55秒的时间来移动
游戏平均30对棋(总共60步)
同步:马格努斯·卡尔森一次只玩一局,从不同时玩两局,直到游戏完成。每款游戏耗时(55 + 5)* 30 == 1800秒,即30分钟。整个展览耗时24 * 30 == 720分钟,即12个小时。
异步:马格努斯·卡尔森从一张桌子移动到另一张桌子,在每张桌子上移动一次。她离开牌桌,让对手在等待时间内采取下一步行动。Judit在所有24局游戏中的一次移动需要24 * 5 == 120秒,即2分钟。整个展览缩短到120 * 30 == 3600秒,也就是1个小时
世界上只有一个马格努斯·卡尔森(Magnus Carlsen),他只有两只手,自己一次只能走一步棋。但异步游戏将展示时间从12小时缩短至1小时。
代码示例:
让我们尝试使用代码片段演示同步和异步执行时间。
异步- async_count.py
import asyncio
import time
async def count():
print("One", end=" ")
await asyncio.sleep(1)
print("Two", end=" ")
await asyncio.sleep(2)
print("Three", end=" ")
async def main():
await asyncio.gather(count(), count(), count(), count(), count())
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
异步-输出:
One One One One One Two Two Two Two Two Three Three Three Three Three
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.
Synchronous - sync_count.py
import time
def count():
print("One", end=" ")
time.sleep(1)
print("Two", end=" ")
time.sleep(2)
print("Three", end=" ")
def main():
for _ in range(5):
count()
if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
同步-输出:
One Two Three One Two Three One Two Three One Two Three One Two Three
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.
为什么在Python中使用asyncio而不是多线程?
It’s very difficult to write code that is thread safe. With asynchronous code, you know exactly where the code will shift from one task to the next and race conditions are much harder to come by.
Threads consume a fair amount of data since each thread needs to have its own stack. With async code, all the code shares the same stack and the stack is kept small due to continuously unwinding the stack between tasks.
Threads are OS structures and therefore require more memory for the platform to support. There is no such problem with asynchronous tasks.
asyncio是如何工作的?
在深入讨论之前,让我们回顾一下Python Generator
Python发电机:
包含yield语句的函数被编译为生成器。在函数体中使用yield表达式会导致该函数成为生成器。这些函数返回一个支持迭代协议方法的对象。自动创建的生成器对象接收__next()__方法。回到上一节的例子,我们可以直接在生成器对象上调用__next__,而不是使用next():
def asynchronous():
yield "Educative"
if __name__ == "__main__":
gen = asynchronous()
str = gen.__next__()
print(str)
请记住以下关于生成器的内容:
Generator functions allow you to procrastinate computing expensive values. You only compute the next value when required. This makes generators memory and compute efficient; they refrain from saving long sequences in memory or doing all expensive computations upfront.
Generators, when suspended, retain the code location, which is the last yield statement executed, and their entire local scope. This allows them to resume execution from where they left off.
Generator objects are nothing more than iterators.
Remember to make a distinction between a generator function and the associated generator object which are often used interchangeably. A generator function when invoked returns a generator object and next() is invoked on the generator object to run the code within the generator function.
发电机状态:
生成器会经历以下几种状态:
当生成器函数第一次返回生成器对象并且迭代还没有开始时,返回GEN_CREATED。
在生成器对象上调用了GEN_RUNNING,并由python解释器执行。
GEN_SUSPENDED:当发电机以一定的产量暂停时
当生成器已完成执行或已关闭时,返回GEN_CLOSED。
生成器对象上的方法:
生成器对象公开了可以调用来操作生成器的不同方法。这些都是:
把()
send ()
close ()
让我们深入了解更多细节
asyncio的规则:
The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid.
The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g(), this is how await tells the event loop, "Suspend execution of g() until whatever I’m waiting on—the result of f()—is returned. In the meantime, go let something else run."
在代码中,第二个要点大致如下所示:
async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r
关于何时以及如何使用async/await也有一组严格的规则。无论你是否还在学习语法,或者已经使用过async/await,这些都很方便:
A function that you introduce with async def is a coroutine. It may use await, return, or yield, but all of these are optional. Declaring async def noop(): pass is valid:
Using await and/or return creates a coroutine function. To call a coroutine function, you must await it to get its results.
It is less common to use yield in an async def block. This creates an asynchronous generator, which you iterate over with async for. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which use await and/or return.
Anything defined with async def may not use yield from, which will raise a SyntaxError.
Just like it’s a SyntaxError to use yield outside of a def function, it is a SyntaxError to use await outside of an async def coroutine. You can only use await in the body of coroutines.
以下是一些简短的例子,旨在总结上述几条规则:
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y
async def g(x):
yield x # OK - this is an async generator
async def m(x):
yield from gen(x) # NO - SyntaxError
def m(x):
y = await z(x) # NO - SyntaxError (no `async def` here)
return y
基于生成器的协程
Python创建了Python生成器和用于协程的生成器之间的区别。这些协程称为基于生成器的协程,并且需要@asynio装饰器。将协程添加到函数定义中,尽管这没有严格执行。
基于生成器的协程使用yield from语法而不是yield。协程可以:
屈服于另一个协程
未来收益
返回一个表达式
提高异常
Python中的协程使得多任务合作成为可能。
协作多任务处理是指正在运行的进程主动将CPU让给其他进程的方法。当一个进程在逻辑上被阻塞时,比如在等待用户输入时,或者当它发起了一个网络请求并将空闲一段时间时,它可能会这样做。
协程可以定义为一个特殊的函数,它可以在不丢失状态的情况下将控制权交给调用者。
那么协程和生成器之间的区别是什么呢?
生成器本质上是迭代器,尽管它们看起来像函数。一般来说,生成器和协程之间的区别是:
生成器将一个值返回给调用者,而协程将控制权交还给另一个协程,并且可以从它放弃控制权开始恢复执行。
一旦启动,生成器就不能接受参数,而协程可以。
生成器主要用于简化迭代器的编写。它们是一种协程,有时也称为半协程。
基于生成器的协程示例
我们可以编写的最简单的基于生成器的协程如下所示:
@asyncio.coroutine
def do_something_important():
yield from asyncio.sleep(1)
协程休眠一秒。注意装饰器的使用和yield from。
本地基于协程示例
原生的意思是,该语言引入了语法来专门定义协程,使它们成为语言中的一等公民。本地协程可以使用async/await语法定义。
我们可以编写的最简单的基于本机的协程如下所示:
async def do_something_important():
await asyncio.sleep(1)
AsyncIO设计模式
AsyncIO有自己的一组可能的脚本设计,我们将在本节中讨论。
1. 事件循环
事件循环是一种编程构造,它等待事件发生,然后将它们分派给事件处理程序。事件可以是用户单击UI按钮,也可以是启动文件下载的进程。异步编程的核心是事件循环。
示例代码:
import asyncio
import random
import time
from threading import Thread
from threading import current_thread
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def do_something_important(sleep_for):
print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
await asyncio.sleep(sleep_for)
def launch_event_loops():
# get a new event loop
loop = asyncio.new_event_loop()
# set the event loop for the current thread
asyncio.set_event_loop(loop)
# run a coroutine on the event loop
loop.run_until_complete(do_something_important(random.randint(1, 5)))
# remember to close the loop
loop.close()
if __name__ == "__main__":
thread_1 = Thread(target=launch_event_loops)
thread_2 = Thread(target=launch_event_loops)
start_time = time.perf_counter()
thread_1.start()
thread_2.start()
print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
thread_1.join()
thread_2.join()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])
执行命令:python async_event_loop.py
输出:
自己尝试并检查输出,您会发现每个衍生线程都在运行自己的事件循环。
事件循环的类型
有两种类型的事件循环:
SelectorEventLoop: SelectorEventLoop基于选择器模块,是所有平台上的默认循环。
ProactorEventLoop: ProactorEventLoop基于Windows的I/O完成端口,仅在Windows上支持。
2. 期货
Future表示正在进行或将在未来被调度的计算。它是一个特殊的低级可等待对象,表示异步操作的最终结果。不要混淆线程。Future和asyncio.Future。
示例代码:
import time
import asyncio
from asyncio import Future
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
async def main():
future = Future()
await asyncio.gather(foo(future), bar(future))
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
执行命令:python async_futures.py
输出:
两个协程都传递了一个future。foo()协程等待future被解析,而bar()协程则在三秒后解析future。
3.任务
任务就像未来,事实上,任务是未来的一个子类,可以使用以下方法创建:
Asyncio.create_task()接受协程并将它们包装为任务。
Loop.create_task()只接受协程。
Asyncio.ensure_future()接受未来、协程和任何可等待对象。
任务包装协程并在事件循环中运行它们。如果一个协程在等待一个Future, Task将挂起该协程的执行并等待Future完成。当Future完成时,将继续执行封装的协程。
示例代码:
import time
import asyncio
from asyncio import Future
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[34m", # Blue
)
async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
async def main():
future = Future()
loop = asyncio.get_event_loop()
t1 = loop.create_task(bar(future))
t2 = loop.create_task(foo(future))
await t2, t1
if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
执行命令:python async_tasks.py
输出:
4. 链协同程序:
协程的一个关键特性是它们可以被链接在一起。协程对象是可等待的,因此另一个协程可以等待它。这允许你把程序分解成更小的、可管理的、可回收的协程:
示例代码:
import sys
import asyncio
import random
import time
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)
async def function1(n: int) -> str:
i = random.randint(0, 10)
print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-1"
print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])
return result
async def function2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])
return result
async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await function1(n)
p2 = await function2(n, p1)
end = time.perf_counter() - start
print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])
async def main(*args):
await asyncio.gather(*(chain(n) for n in args))
if __name__ == "__main__":
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start_time = time.perf_counter()
asyncio.run(main(*args))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
请仔细注意输出,其中function1()休眠了可变的时间,function2()在结果可用时开始工作:
执行命令:python async_chained.py 11 8
输出:
5. 使用队列:
在这种设计中,没有任何个体消费者与生产者之间的链接。消费者不知道生产者的数量,甚至不知道将被添加到队列中的项目的累积数量。
单个生产者或消费者分别花费不同的时间从队列中放置和提取项目。队列作为一个吞吐量,可以与生产者和消费者进行通信,而不需要它们彼此直接通信。
示例代码:
import asyncio
import argparse
import itertools as it
import os
import random
import time
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[34m", # Blue
)
async def generate_item(size: int = 5) -> str:
return os.urandom(size).hex()
async def random_sleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
async def produce(name: int, producer_queue: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await random_sleep(caller=f"Producer {name}")
i = await generate_item()
t = time.perf_counter()
await producer_queue.put((i, t))
print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])
async def consume(name: int, consumer_queue: asyncio.Queue) -> None:
while True:
await random_sleep(caller=f"Consumer {name}")
i, t = await consumer_queue.get()
now = time.perf_counter()
print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])
consumer_queue.task_done()
async def main(no_producer: int, no_consumer: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for consumer in consumers:
consumer.cancel()
if __name__ == "__main__":
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--no_producer", type=int, default=10)
parser.add_argument("-c", "--no_consumer", type=int, default=15)
ns = parser.parse_args()
start_time = time.perf_counter()
asyncio.run(main(**ns.__dict__))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
执行命令:python async_queue.py -p 2 -c 4 .执行以下命令
输出:
最后,让我们看一个asyncio如何减少等待时间的例子:给定一个协程generate_random_int(),它不断生成范围为[0,10]的随机整数,直到其中一个超出阈值,您希望让这个协程的多个调用不需要彼此连续等待完成。
示例代码:
import time
import asyncio
import random
# ANSI colors
colors = (
"\033[0m", # End of color
"\033[31m", # Red
"\033[32m", # Green
"\033[36m", # Cyan
"\033[35m", # Magenta
"\033[34m", # Blue
)
async def generate_random_int(indx: int, threshold: int = 5) -> int:
print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")
i = random.randint(0, 10)
while i <= threshold:
print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")
await asyncio.sleep(indx + 1)
i = random.randint(0, 10)
print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])
return i
async def main():
res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))
return res
if __name__ == "__main__":
random.seed(444)
start_time = time.perf_counter()
r1, r2, r3 = asyncio.run(main())
print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
执行命令:python async_random.py
输出:
注意:如果您自己编写任何代码,最好使用本机协同程序
为了明确而不是含蓄。发电机的基础
协程将在Python 3.10中被移除。
GitHub Repo: https://github.com/tssovi/asynchronous-in-python