一. asyncio基本操作
1.1 任务状态
上一节我们提到asyncio的任务使用协程对象作为参数创建。并且任务含有多种状态。下面我们使用一个简单的例子来说明任务的各种状态。
import time
import asyncio
@asyncio.coroutine
def do_some_work():
print('Coroutine start.')
time.sleep(3)
print('Coroutine finished.')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = do_some_work()
task = loop.create_task(coroutine) # 创建任务
print('task is instance of asyncio.Task?', 'yes' if isinstance(task, asyncio.Task) else 'No')
print(f'task state {task._state}')
loop.run_until_complete(task)
print(f'task state {task._state}')
end = time.time()
print(f'运行耗时: {end-start:.2f}')
if __name__ == '__main__':
main()
运行结果:事件循环的 create_task 方法可以创建任务,另外 asyncio.ensure_future 方法也可以创建任务,参数须为协程对象。task 是 asyncio.Task 类的实例,创建 task 可以方便预激协程以及处理协程运行中遇到的异常。task 对象的 _state 属性保存当前任务的运行状态,任务的运行状态有 PENDING 和 FINISHED 两种。
1.2 async / await关键字
Python3.5新增的async和await关键字可以用来定义协程函数。这两个关键字是一个组合,其作用等同于@asyncio.coroutine装饰器和yield from语句。以便将协程函数和生成器函数在语法上做出明显的区分。
1.3 绑定回调
假设协程包含一个IO操作(这几乎是肯定的),等它处理完数据后,我们希望得到通知,以便下一步数据处理。这一需求可以通过向future对象添加回调实现。那什么是future对象呢?task对象就是future对象,因为asyncio.Task是asyncio.Future的子类。因此task对象也可以添加回调函数。回调函数的最后一个参数是future或task对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以使用functools.partial偏导函数传入。
import asyncio
import time
from functools import partial
async def coro_work():
print('coro_work -> Coroutine start.')
time.sleep(3)
print('coro-work -> Coroutine finished.')
def callback(name, task):
print(f'callback -> {task._state}')
print(f'callback -> {name}')
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine = coro_work()
task = loop.create_task(coroutine)
task.add_done_callback(partial(callback, 'Coroutine, Bye Bye~'))
loop.run_until_complete(task)
end = time.time()
print(f'运行耗时:{end - start:.2f}')
if __name__ == '__main__':
main()
运行结果:使用 async 关键字替代 asyncio.coroutine 装饰器创建协程函数。callback为回调函数,协程终止后需要运行的代码写入回调函数,回调函数的参数有要求,最后一个位置参数须为 task 对象。task 对象的 add_done_callback 方法可以添加回调函数,注意参数必须是回调函数,这个方法不能传入回调函数的参数,这一点需要通过 functools 模块的 partial 方法解决,将回调函数和其参数 name 作为 partial 方法的参数,此方法的返回值就是偏函数,偏函数可作为 task.add_done_callback 方法的参数。
二. 协程处理多任务
开始介绍asyncio模块到现在,我们还没有使用协程处理多任务。在实际项目中,往往有多个协程对象,并创建多个任务,同时在一个loop里运行。为了把多个协程交给loop,需要借助asyncio.gather方法。任务的result方法可以获得对应协程函数的return值。
import asyncio
import time
async def coro_work(name, t):
print(f'[coro_work] Coroutine {name} start.')
await asyncio.sleep(t)
print(f'[coro_work] Coroutine {name} finished.')
return f'Coroutine {name} OK.'
def main():
start = time.time()
loop = asyncio.get_event_loop()
coroutine1 = coro_work('ONE', 3)
coroutine2 = coro_work('TWO', 1)
task1 = loop.create_task(coroutine1)
task2 = asyncio.ensure_future(coroutine2)
gather = asyncio.gather(task1, task2)
loop.run_until_complete(gather)
print(f'[task1 result] {task1.result()}')
print(f'[task2 result] {task2.result()}')
end = time.time()
print(f'运行耗时:{end-start:.4f}')
if __name__ == '__main__':
main()
代码说明:
await关键字等同于python3.4中的yield from语句,后面接协程对象。asyncio.sleep方法的返回值为协程对象,此处为阻塞运行。asyncio.sleep与time.sleep是不同的,前者阻塞当前协程,即coro_work函数的运行,而time.sleep会阻塞整个线程,所以此处使用前者,阻塞当前协程,CPU可以在线程内的其它协程运行。
协程函数的return值在协程运行结束后通过调用对应task对象的result方法返回。asyncio.gather方法接收多个task作为参数,创建任务搜集器。注意,asyncio.gather方法中参数的顺序决定了协程的启动顺序。时间循环的run_until_complete方法也可接收任务搜集器作为参数,并阻塞运行,直到全部任务完成。任务结束后,事件循环终止,打印任务的result方法返回值,即协程函数的return值。
运行结果:在事件循环内部,2个协程时交替运行完成的:首先运行task1,打印“[coro_work] Coroutine ONE start.”,task1运行到asyncio.sleep阻塞,让步CPU的使用权给task2执行,打印“[coro_work] Coroutine TWO start.”,task2运行到asyncio.sleep阻塞,再次让步CPU的使用权,但此刻事件循环发现所有协程都处于阻塞状态,只能等待阻塞结束。task2的阻塞时间较短,阻塞1s后结束,打印“[coro_work] Coroutine TWO finished.”;又过了2s,阻塞3s的task1也结束了,打印“[coro_work] Coroutine ONE finished.”。至此,2个任务全部完成,事件循环停止,打印task1和task2的返回值,任务总耗时约3s,如果使用单线程同步模型则至少4s。
注意:
- 多数情况下无需调用task的add_done_callback方法,可以直接把回调函数中的代码写入await语句后面,协程是可以暂停和恢复的。
- 多数情况下同样不需要调用task的result方法获取协程函数的return值,因为事件循环的run_until_complete方法的返回值就是协程函数的返回值。
- 事件循环有一个stop方法来停止循环和一个close方法来关闭循环。以上示例均没有调用loop.close方法,似乎并没有什么问题,那调用loop.close是否是必须的呢?简言之,loop只要不关闭,就可以再次运行run_until_complete()方法,关闭后则不可运行。有人建议调用loop.close,以彻底清理loop对象防止误用,其实多数情况下并无必要。
- asyncio提供了asyncio.gather和asyncio.wait两个任务搜集方法,它们的作用相同,都是将协程任务按顺序排定,再将返回值作为参数加入到事件循环中。二者的主要区别在于,asyncio.wait可以获取任务的执行状态(PENDING & FINISHED),当有一些特殊需求,比如某些情况下取消任务,可以使用asyncio.wait搜集器。
三. 取消任务
在事件循环启动之后,停止之前,我们可以手动取消任务的执行,但注意只有PENDING状态的任务才允许取消,FINISHED状态的任务已经完成,自然无法取消。
import asyncio
async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print(f'Work {id} done.')
def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
loop.stop() # 取消所有未完成的任务,停止事件循环
finally:
loop.close() # 关闭事件循环
if __name__ == '__main__':
main()
运行结果:程序运行过程中,按Ctrl + C会触发KeyboardInterrupt异常。捕获这个异常,将取消所有未完成的任务。
除了使用事件循环的stop方法取消所有未完成的任务,还可以直接调用任务的cancel方法,而asyncio.Task.all_tasks方法可以获得事件循环中的全部任务。让我们修改上个实例的main()函数代码:
import asyncio
async def work(id, t):
print('Working...')
await asyncio.sleep(t)
print(f'Work {id} done.')
def main():
loop = asyncio.get_event_loop()
coroutines = [work(i, i) for i in range(1, 4)]
try:
loop.run_until_complete(asyncio.gather(*coroutines))
except KeyboardInterrupt:
# loop.stop() # 取消所有未完成的任务,停止事件循环
print()
tasks = asyncio.Task.all_tasks()
for task in tasks:
print(f'正在取消任务:{task}')
print(f'任务取消:{task.cancel()}')
finally:
loop.close() # 关闭事件循环
if __name__ == '__main__':
main()
运行结果: 程序运行到work 1 done输出时,按下 Ctrl + C 会触发 KeyboardInterrupt 异常。asyncio.Task.all_tasks()可以捕获事件循环(每个线程只能有一个事件循环)中的所有任务的集合,任务状态有PENDING和FINISHED两者。任务的cancel方法可以取消未完成的任务,取消成功返回True,已完成的任务由于取消失败返回False。