本文大纲翻译自 medium.com/@superfastp…

Python Asyncio 让咱们能够进行基于协程(coroutine)的异步编程。但从python3.4(2014)开端引进,到python3.7的成熟,asyncio其完成已推出许多年了。尽管如此,异步编程仍然是python中最吸引人但又最令人懊丧的领域。为什么呢?因为它是在是太难上手了,尤其对没有js等自带async的编程语言开发经历的人来说。关于python开发者来说,asyncio不是一个添加到python的特性,而是一个完全新的编程范式(paradigm),这需求咱们从头架构咱们的程序。正是这种不同的思维让开端学习asyncio变的极度懊丧,乃至憎恶asyncio。本文将带你快速上手,趟过这条河。

基本概念

协程

前面咱们提到了Asyncio基于协程,咱们先简略了解一下协程(Coroutine)的概念,协程往往被称为轻量级线程、纤程,许多编程语言都具有这个概念,例如go、js包括比较新的java21。咱们知道线程之间切换是比较耗时的,需求从用户态切换到内核态,而协程并不涉线程切换,自己维护上下文,一个线程能够维护多个协程。

python asyncio 概念解说与快速上手
可是协程虽好,可是单线程内仍只要一个协程能够获取履行权,关于cpu密集型使命,它乃至可能会更慢,所以协程首要是解决的是io相关的问题。

事情循环 event loop

asyncio 运用了单线程的事情循环行列来履行协程。整个流程便是,将使命提交到事情行列中,会有一个履行线程不停的从行列中获取并履行使命,当遇到需求io的操作时,会指派给操作体系监控状况,然后继续获取使命,直到一切使命都履行完结。当然,履行使命期间,也会发生新的使命,包括io完结也是使命。

python asyncio 概念解说与快速上手
事情循环行列中最重要的是封装了一个对操作体系供给的io多路复用器,不同操作体系完结不同,linux的epoll,windows的iocp等等。简略来说便是,当咱们遇到io时(拜访文件体系、网络恳求等),咱们能够把等候io完结的使命交给操作体系,而等候io这期间,咱们能够不让线程挂起,去干更多的事情,这关于有着紧箍咒(大局解说锁GIL)的python来说,对错常有用的。

1.怎么界说、创立、运转、切换协程

咱们运用 async def来界说一个协程,这儿能够把async def看作是def的扩张,专门用来界说协程。 一般的def界说的function运转之后会发生一个对应的回来值,而async def运转之后,会回来一个coroutine。 coroutine必须在event loop 内运转,咱们能够经过asyncio.run()运转coroutine。

# define a coroutine  
async def custom_coro():  
 print('Hello there')
# create a coroutine  
coro = custom_coro()
# 运转custom_coro()之后回来的是一个coroutine
type(coro)
# run a coroutine in the event loop  
asyncio.run(coro)

咱们怎么从一个协程切换到另一个协程呢?咱们能够运用await关键字。

async def custom_coro():
	# 中止当前协程,然后运转other_coro 
	await other_coro()

当协程遇到await 润饰的函数时,会中止,直到await的协程履行完毕,然后会到当前状况继续继续履行。

2.task与coroutine

task是coroutine的一个封装,相关于coroutine,task具有检查状况、暂停使命等更丰厚的功能。通常来说,咱们最好将协程转成task,许多当地现在支撑coroutine,实践上是python悄悄帮你转为task,将来的某个版别可能不再支撑。

# create and schedule a task  
task = asyncio.create_task(other_coro())

在运用asyncio.create_task()后,会将协程封装成一个task,而且会组织它进入事情循环,而且只要机会,就让它立刻履行。 这个调用也回来了一个task object,经过这个回来值,咱们能够回去回来值成果、检查使命状况等等。 咱们能够运用await来暂停当前线程获取成果,也可根据task的方法来获取成果。

 # 等候,直到task履行完,并获取回来值
 value=await task
 # check task,假如task没有完结直接获取result会抛反常
 if task.done():
	 value=task.result()

下面给一个完整比方:运用coroutine创立一个task,然后过一段时刻后再获取成果,我为大家标出了履行次序,以便更好的了解。

# example of scheduling an async task  
import asyncio  
# coroutine to perform some useful task  
async def task_coro():  
#3. report a message  
	print('The task is running...')  
#4. suspend and sleep for a moment  
	await asyncio.sleep(1)  
#6. report a message  
	print('The task done')  
#7. return a result  
return 'The answer is 100'  
# main coroutine  
async def main():  
#1. run a task independently  
	task = asyncio.create_task(task_coro())  
#2. suspend a moment, allow the scheduled task to run  
	await asyncio.sleep(0)  
#5. wait for the async task to complete  
	await task  
#8. report the return value  
	print(f'Got: {task.result()}')  
# create the coroutine and run it in the event loop  
asyncio.run(main())

这儿又个需求注意的点:从1.2.3步咱们能够看到task创立以后并没有直接履行(默认情况下),而是在await asyncio.sleep(0) 后main协程中止之后,task_coro才开端履行。

3.多个协程怎么并行

咱们能够在aysncio程序内并行的运转多个程序,这关于并行下载多个文件或许高并发场景下都很有用。

咱们能够运用asyncio.gather()来运转协程,它能够承受多个coroutine或许task,然后回来一个asyncio.Future。Future是Task的父类,代表着未来将会有一个成果,Task是一个为了包裹协程的Future。

future = asyncio.gather(coro1(), coro2() coro3())

Future 与Task Coroutine相同,他们的目标都能够被await润饰的目标,都是完结了__await__。在这儿咱们能够await future 来完结等候一切协程或许使命,最终会回来一个迭代器,获取每个协程的运转成果,次序与添加的次序相同。

下面的比方中,准备了100个协程,每个协程随机sleep 0-1秒,一切协程并发履行。

# example of running many coroutines concurrently
import random
import asyncio
import time
# coroutine to perform some useful task
async def task_coro(arg) -> str:
    # generate a random value between 0 and 1
    value = random.random()
    # suspend and sleep for a moment
    await asyncio.sleep(value)
    # report the argument and value
    return f'Task {arg} done after {value} seconds'
# main coroutine
async def main():
    # create many coroutines
    coros = [task_coro(i) for i in range(100)]
    # suspend and run all coroutines
    now=time.time()
    results=await asyncio.gather(*coros)
    print(f"total taken:{time.time()-now}")
    for er in results:
        print(er)
# create the coroutine and run it in the event loop
asyncio.run(main())

能够看到,总共花费不到1秒就都履行完毕,然后循环输出了,每个协程的回来值。

total taken:0.9943249225616455
Task 0 done after 0.2671076110393307 seconds
Task 1 done after 0.514333336910391 seconds
...

4.怎么等候多个tasks

假如咱们不是仅仅想等候一切task都完结,而是有更杂乱的需求,第一个出现反常就暂停等等操作,咱们能够运用asyncio.wait()。 经过函数的界说,咱们能够看到咱们能够设置超时时刻和何时回来,默认时全部完结时ALL_COMPLETED回来,也能够是第一个完结FIRST_COMPLETED,或许是第一个反常回来FIRST_EXCEPTION。

(function) def wait(
fs: Iterable[Awaitable[_T@wait]],  
*,  
timeout: float | None = None,  
return_when: str = "ALL_COMPLETED")

继续运用上一个的场景进行多个task的等候,只不过这次咱们咱们运用asyncio.wait(),获取第一个完结的成果。

# SuperFastPython.com  
# example of waiting for a collection of tasks  
import random  
import asyncio  
# coroutine to perform some useful task  
async def task_coro(arg):  
# generate a random value between 0 and 1  
value = random.random()  
# suspend and sleep for a moment  
await asyncio.sleep(value)  
# return a value unique for this task  
return arg * value  
# main coroutine  
async def main():  
# create and schedule many independent tasks  
tasks = [asyncio.create_task(task_coro(i)) for i in range(100)]  
# suspend and wait for the first task to complete  
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)  
# report the result from the first task  
task = done.pop()  
print(f'First task got: {task.result()}')  
# create the coroutine and run it in the event loop  
asyncio.run(main())

5. 按使命完结次序处理成果

当使命完结时,咱们能够处理或许提交成果。与前面按照提交使命次序回来成果不同,当某些使命很快,某些使命完结很慢,假如等候一切成果,将会浪费更多的时刻。咱们能够经过asyncio.as_complete()按照使命完结次序来处理成果。

asyncio.as_complete()承受一个awaitable集合,比方一个List[Task],回来一个迭代器,每次迭代将会yield 一个task,当运用await时,等一个现已完结的task后回来成果。

# get a generator that yields awaitables in completion order then iterate
for task in asyncio.as_completed(tasks):  
	# 获取第一个现已完结的使命
	result = await task

6. 怎么完结协程之间同享数据

协程之间数据同享咱们能够经过asyncio.Queue,它是一个先进先出、协程安全的行列,咱们无需忧虑Queue的添加、获取的竞争问题。

创立一个行列,能够经过添加maxsize参数设置上限
queue = asyncio.Queue()

咱们能够经过put()来添加数据到行列中,同时会回来一个协程。事实上,咱们put时,必定要await润饰,因为假如Queue达到上限的时分会阻塞住。相同的获取数据时也需求运用await润饰get()。

# add and retrieve
await queue.put(item)
item = await queue.get()

下面咱们运用asyncio.Queue 来写一个经典的生产者顾客程序。生产者往行列里放十个元素,每放进去一个,顾客就获取一个打印出来,直到最终消费到None为止。

from random import random
import asyncio
# coroutine to generate work
async def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        value = random()
        await asyncio.sleep(value)
        await queue.put(value)
    await queue.put(None)
    print('Producer: Done')
async def main():
    # create the shared queue
    queue = asyncio.Queue()
    # run the consumer as an independent task
    asyncio.create_task(producer(queue))
    # consume items from the queue until a None is seen
    while value:=await queue.get():
        # report the value
        print(f'Got: {value}')
asyncio.run(main())

7. 怎么写NIO程序

咱们能够经过asyncio.open_connection()来创立一个TCP client。调用之后会回来一个协程,等候协程完结之后将会回来一个读流StreamReader和一个写流StreamWriter。

# 假如需求ssl,则需求设置ssl=True
reader, writer = await asyncio.open_connection('www.baidu.com', 443, ssl=True)

咱们能够运用encode()将string转为bytes,然后运用将数据写入socket,然后运用await writer.drain()来等候一切的数据现已发送出去。

# encode string data to byte data  
byte_data = string_data.encode()  
# write byte data  
writer.write(bytes_data)
# wait for data to be transmitted  
await writer.drain()

read是与write相反的操作。

# read byte data  
byte_data = await reader.read()  
# decode bytes to strings  
string_data = byte_data.decode()

下面咱们进行一下实战,以NIO的方式拜访百度主页。

import asyncio
async def fetch_baidu_homepage():
    host = 'www.baidu.com'
    port = 80
    # 构建HTTP恳求
    request = (
        f"GET / HTTP/1.1rn"
        f"Host: {host}rn"
        "Connection: closern"
        "rn"
    )
    # 运用asyncio.open_connection树立TCP衔接
    reader, writer = await asyncio.open_connection(host, port)
    # 发送HTTP恳求
    writer.write(request.encode())
    # 读取响应
    response = b""
    while True:
        data = await reader.read(1024)
        if not data:
            break
        response += data
    # 关闭衔接
    writer.close()
    print(response.decode())
if __name__ == '__main__':
    asyncio.run(fetch_baidu_homepage())

最终

这儿简略介绍了asyncio首要的一些操作和概念,实践过程中往往更加杂乱,或许会有一些结构对其进行了封装。总之,经过这些比方,咱们能对python的异步编程有必定的了解,开端向异步编程的国际迈出开端的第一步。