协程不是计算机提供的,是人为创造的上下文切换技术,也可以被称为微线程。简而言之 其实就是在一个线程中实现代码块相互切换执行。
我们知道正常代码是从上到下依次执行,一个方法或函数操作完毕后才会进入下一个方法或函数执行。例如:
def func1(): print(1) print(2) def func2(): print(3) print(4) func1() func2()
此时代码执行逻辑一定是先执行完func1()对象里的语句再执行func2() ,这种称为同步。但是如果我们想在func1()对象中print(1)后切换到func2()该怎么做呢?
可以采用以下几种基于协程的方式:
# greenlet是第三方模块需先引入 pip3 install greenlet
# -*- coding: utf-8 -*- # author: micher.yu # Time:2022/01/08 # simple_desc : from greenlet import greenlet def func1(): print(1) # 第二步:输出1 gr2.switch() # 第三步:切换到 func2 函数 print(2) # 第六步:输出2 gr2.switch() # 第七步:切换到func2 函数(如果不切换的话句柄会继续往下执行,也就不会进入func2 输出4) def func2(): print(3) # 第四步:输出3 gr1.switch() # 第五步:切换到func1 函数 print(4) # 第八步:输出4,func2函数 执行完毕句柄继续往下执行 def func3(): print(5) # 第十步:输出5 gr1 = greenlet(func1) # 此处只是生成greenlet包装的func1对象,代码并不会实际运行 gr2 = greenlet(func2) # 此处生成greenlet包装的func2对象 gr1.switch() # 第一步:此处是正式执行func1()对象 func3() # 第九步:实例化func3 # 所以实际输出会是 1 3 2 4 5
不推荐,实际应用场景比较少。
如果对yield关键字还不太熟悉的话可以参考往期这篇文章详解python三大器——迭代器、生成器、装饰器其中生成器部分有详细讲解
def func1(): yield 1 yield from func2() # 这里其实相当于for item in func2(): yield item yield 2 def func2(): yield 3 yield 4 for item in func1(): print(item) # 输出结果将会是:1 3 4 2
在python3.4及之后的版本才可使用,这个框架使用事件循环来编排回调和异步任务。事件循环位于事件循环策略的上下文中。
下图是协程,事件循环和策略之间的相互作用
注意:asyncio
牛逼在于遇到IO阻塞
自动切换!
下面我们使用@asyncio.coroutine
装饰器(py3.10+会移除)定义了两个协程函数。(基于生成器的协程)
import asyncio @asyncio.coroutine def func1(): print(1) # 此处用asyncio.sleep(2)来模拟IO耗时(asyncio.sleep也是一个协程对象,不能用time.sleep()),asyncio定义的协程函数遇到IO操作时会自动切换到事件循环中的其他任务 yield from asyncio.sleep(2) print(2) @asyncio.coroutine def func2(): print(3) yield from asyncio.sleep(2) print(4)
PS:如果py版本高于3.8依然可以使用asyncio.coroutine
装饰器但是会有告警建议你使用async & await
关键字来定义协程函数,不会影响使用!
协程函数并不能像普通函数一样直接实例化运行,调用协程函数协程并不会开始运行,只是返回一个协程对象。
fun1() # 此处是不会有结果的
可以通过 asyncio.iscoroutine
来验证是否是协程对象
print(asyncio.iscoroutine(func1())) # True
协程对象必须在事件循环中运行,我们可以通过asyncio.get_event_loop
方法来获取当前正在运行的循环实例。如loop
对象,然后把协程对象交给 loop.run_until_complete
,协程对象随后会在 loop
里得到运行。
loop = asyncio.get_event_loop() loop.run_until_complete(func1()) # 运行结果为: # 1 # 等待2s # 2
run_until_complete
是一个阻塞(blocking)调用,直到协程运行结束,它才返回;所以他必须接受的是一个可等待对象
(协程
, 任务
和future对象)。
run_until_complete
的参数是一个 future
,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查
要让这个协程对象转成future
的话,可以通过 asyncio.ensure_future
方法。
所以,我们可以写得更明显一些:
loop = asyncio.get_event_loop() loop.run_until_complete(asnycio.ensure_future(func1())) # 运行结果为: # 1 # 等待2s # 2
在有多个协程函数需要运行时怎么办?
我们可以将协程对象包装成future对象
后再放到一个列表中再通过asyncio.wait
运行。
asyncio.wait方法
或await关键字
只能传可等待
对象
tasks = [ asyncio.ensure_future(func1()), # 把协程对象包转成一个 future 对象 asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # 运行结果为: # 1 # 3 # 等待2s # 2 # 4
通过asyncio.gather
可以直接将协程对象放到列表中(必须解包!也就是*[]):
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*[func1(), func2()])) # 运行结果为: # 1 # 3 # 等待2s # 2 # 4
完整代码为:
import asyncio @asyncio.coroutine def func1(): print(1) yield from asyncio.sleep(2) # 此处用asyncio.sleep(2)来模拟IO耗时(asyncio.sleep也是一个协程对象,不能用time.sleep()),自动切换到tasks中的其他任务 print(2) @asyncio.coroutine def func2(): print(3) yield from asyncio.sleep(2) # 此处又遇到IO阻塞后,又会自动切换到tasks中其他的任务 print(4) func1() # 调用协程函数,协程并不会开始运行,只是返回一个协程对象。可以通过 asyncio.iscoroutine 来验证是否是协程对象 print(asyncio.iscoroutine(func1())) # True tasks = [ asyncio.ensure_future(func1()), # 把协程对象包转成一个 future 对象 asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() # 方式一: loop.run_until_complete(asyncio.wait(tasks)) # 方式二: loop.run_until_complete(asyncio.gather(*[func1(), func2()]))
Task对象
:Task 对象
的作用是在运行某个任务的同时可以并发的运行其他任务。
Task 对象
可以使用 asyncio.create_task()
函数创建,也可以使用 loop.create_task()
cancel()
cancelled()
done()
result()
add_done_callback(task)
asyncio.all_tasks()
asyncio.current_task()
使用 loop
对象的 create_task
函数创建一个 Task
对象,在第一次打印 Task
对象时,状态为 pending
,完成执行函数后的状态为 finished
。
import asyncio async def do_something(): print("这是一个Task例子....") # 模拟阻塞1秒 await asyncio.sleep(1) return "Task任务完成" # 创建一个事件event_loop loop = asyncio.get_event_loop() # 创建一个task task = loop.create_task(do_something()) # 第一次打印task print(task) # 将task加入到event_loop中 loop.run_until_complete(task) # 再次打印task print(task) print(task.result()) """ 运行结果 #1 <Task pending name='Task-1' coro=<do_something() running at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97>> #2 这是一个Task例子.... #3 <Task finished name='Task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task任务完成'> #4 Task任务完成 """
Task
对象的 result()
函数可以获取 do_something()
函数的返回值。
import asyncio async def do_something(task_id): print(f"这是一个Task例子,当前task_id:{task_id}") # 模拟阻塞1秒 await asyncio.sleep(1) return f"Task-id {task_id} 任务完成" # 任务完成后的回调函数 def callback(task): # 打印参数 print(task) # 打印返回的结果 print(task.result()) # 创建一个事件event_loop loop = asyncio.get_event_loop() # 创建一个task tasks = [] for i in range(5): name = f"task-{i}" task = loop.create_task(do_something(name), name=name) task.add_done_callback(callback) tasks.append(task) # 将task加入到event_loop中 loop.run_until_complete(asyncio.wait(tasks)) """ 输出为: 这是一个Task例子,当前task_id:task-0 这是一个Task例子,当前task_id:task-1 这是一个Task例子,当前task_id:task-2 这是一个Task例子,当前task_id:task-3 这是一个Task例子,当前task_id:task-4 <Task finished name='task-0' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-0 任务完成'> Task-id task-0 任务完成 <Task finished name='task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-1 任务完成'> Task-id task-1 任务完成 <Task finished name='task-2' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-2 任务完成'> Task-id task-2 任务完成 <Task finished name='task-3' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-3 任务完成'> Task-id task-3 任务完成 <Task finished name='task-4' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-4 任务完成'> Task-id task-4 任务完成 """
使用 asyncio.wait()
函数将 Task 任务列表
添加到 event_loop
中,也可以使用 asyncio.gather()
函数。
import asyncio import functools async def do_something(t): print("暂停" + str(t) + "秒") await asyncio.sleep(t) return "暂停了" + str(t) + "秒" def callback(event_loop, gatheringFuture): print(gatheringFuture.result()) print("多个Task任务完成后的回调") loop = asyncio.get_event_loop() gather = asyncio.gather(do_something(1), do_something(3)) gather.add_done_callback(functools.partial(callback, loop)) loop.run_until_complete(gather) """ 输出为: 暂停1秒 暂停3秒 ['暂停了1秒', '暂停了3秒'] 多个Task任务完成后的回调 """