协程是一种多方协同的工作方式。当前执行者在某个时刻主动让出(yield)控制流,并记住自身当前的状态,以便在控制流返回时能从上次让出的位置恢复(resume)执行。
简而言之,协程的核心思想就在于执行者对控制流的 “主动让出” 和 “恢复”。相对于,线程此类的 “抢占式调度” 而言,协程是一种 “协作式调度” 方式。
在 I/O 密集型场景中,抢占式调度的解决方案是 “异步 + 回调” 机制。
其存在的问题是,在某些场景中会使得整个程序的可读性非常差。以图片下载为例,图片服务中台提供了异步接口,发起者请求之后立即返回,图片服务此时给了发起者一个唯一标识 ID,等图片服务完成下载后把结果放到一个消息队列,此时需要发起者不断消费这个 MQ 才能拿到下载是否完成的结果。
可见,整体的逻辑被拆分为了好几个部分,各个子部分都会存在状态的迁移,日后必然是 BUG 的高发地。
而随着网络技术的发展和高并发要求,协程所能够提供的用户态协同调度机制的优势,在网络操作、文件操作、数据库操作、消息队列操作等重 I/O 操作场景中逐渐被挖掘。
协程将 I/O 的处理权从内核态的操作系统交还给用户态的程序自身。用户态程序在执行 I/O 时,主动的通过 yield(让出)CPU 的执行权给其他协程,多个协程之间处于平等、对称、合作的关系。
首先需要注意的是,协程自身无法利用多核,需要配合进程来使用才可以在多核平台上发挥作用。
Python 对协程的支持经历了多个版本:
async/await 的示例程序:
import asyncio from pathlib import Path import logging from urllib.request import urlopen, Request import os from time import time import aiohttp logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png', 'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg', 'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg', 'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg', 'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png'] async def download_image_async(session, dir, img_url): download_path = dir / os.path.basename(img_url) async with session.get(img_url) as response: with download_path.open('wb') as f: while True: # 在 async 函数中使用 await 关键字表示等待 task 执行完成,也就是等待 yeild 让出控制权。 # 同时,asyncio 使用事件循环 event_loop 来实现整个过程。 chunk = await response.content.read(512) if not chunk: break f.write(chunk) logger.info('Downloaded: ' + img_url) # 使用 async 关键字声明一个异步/协程函数。 # 调用该函数时,并不会立即运行,而是返回一个协程对象,后续在 event_loop 中执行。 async def main(): images_dir = Path("codeflex_images") Path("codeflex_images").mkdir(parents=False, exist_ok=True) async with aiohttp.ClientSession() as session: tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS] await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': start = time() # event_loop 事件循环充当管理者的角色,将控制权在几个协程函数之间切换。 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close() logger.info('Download time: %s seconds', time() - start)
https://mp.weixin.qq.com/s/LItTjy2uN6iJvN2MqNPQ7Q