为了更好的演示,我准备了三个函数,一个同步的函数,两个异步的函数
# 定义阻塞的函数 def ping(url): print("阻塞函数开始运行") time.sleep(2) os.system("ping %s"%url) print("阻塞函数运行结束") # 定义两个异步函数 async def asyncfunc1(): print("Suspending func1") await asyncio.sleep(1) print("func func1 ", threading.current_thread()) print('Resuming func1') return "func1" async def asyncfunc2(): print("Suspending func2") await asyncio.sleep(1) print("func func2 ", threading.current_thread()) print('Resuming func2') return "func2"
协程中控制任务
单个协程任务的运行
上面的函数,比如说我只想将asyncfunc1() 函数运行并且得结果,可以使用 loop.create_task()
方法创建一个task对象,task是Futures的子类,当调用 loop.run_until_complete()
以后,协程跑完以后,通过 task.result()
获取协程函数的返回结果。
async def asyncfunc1(): print("Suspending func1") await asyncio.sleep(1) print("func func1 ", threading.current_thread()) print('Resuming func1') return "func1" if __name__=="__main__": print("In main thread ",threading.current_thread()) loop = asyncio.get_event_loop() task = loop.create_task(asyncfunc1()) loop.run_until_complete(task) print("task result is ",task.result())
输出结果为
In main thread <_mainthread(mainthread, started="" 6140)=""> Suspending func1 func func1 <_mainthread(mainthread, started="" 6140)=""> Resuming func1 task result is func1
主线程和跑的协程函数是在同一个线程中。
也可以给task对象添加一个回调方法
#coding:gbk import asyncio import time,sys async def asyncfunc1(): print("Suspending func1") await asyncio.sleep(1) print("func func1 ", threading.current_thread()) print('Resuming func1') return "func1" # 定义一个回调函数 def callbackfunc(task): print("task 运行结束,它的结果是:",task.result()) if __name__=="__main__": print("In main thread ",threading.current_thread()) loop = asyncio.get_event_loop() task = loop.create_task(asyncfunc1()) task.add_done_callback(callbackfunc) loop.run_until_complete(task)
输出结果为
In main thread<_mainthread(mainthread, started="" 11248)="">
Suspending func1
func func1<_mainthread(mainthread, started="" 11248)="">
Resuming func1
task 运行结束,它的结果是: func1
loop.run_until_complete
是一个阻塞方法,只有当它里面的协程运行结束以后这个方法才结束,才会运行之后的代码。
其实也可以不调用 loop.run_until_complete
方法,创建一个task以后,其实就已经在跑协程函数了,只不过当事件循环如果准备开始运行了,此时的task状态是 pending
,如果不调用事件循环的话,则不会运行协程函数,由于主线程跑完了,子线程也就被销毁了,如代码写成这样:
if __name__=="__main__": print("In main thread ",threading.current_thread()) loop = asyncio.get_event_loop() task = loop.create_task(asyncfunc1()) time.sleep(3)
得到的输出是
In main thread<_mainthread(mainthread, started="" 6056)="">
Task was destroyed but it is pending!
task:
cb=[callbackfunc() at test.py:39]>
sys:1: RuntimeWarning: coroutine ‘asyncfunc1’ was never awaited
所以想要使得协程函数得到执行,需要调用事件循环来执行任务,上面的 loop.run_until_complete
就是使循环开始跑了,其实也可以使用 loop.run_forever()
,这个函数就像它的名字一样,会一直跑。只有事件循环跑起来了,那么使用该循环注册的协程才会得到执行,但是如果使用 loop.run_forever()
则会阻塞在这里,事件循环还有一个 stop
方法,可以结束循环,我们可以在task对象上添加一个回调方法,当协程执行结束以后,调用事件循环的 stop
方法来结束整个循环
#coding:gbk import asyncio import time,sys async def asyncfunc1(): print("Suspending func1") await asyncio.sleep(1) print("func func1 ", threading.current_thread()) print('Resuming func1') return "func1" # 定义一个回调函数 def callbackfunc(task): print("task 运行结束,它的结果是:",task.result()) loop.stop() if __name__=="__main__": print("In main thread ",threading.current_thread()) loop = asyncio.get_event_loop() task = loop.create_task(asyncfunc1()) task.add_done_callback(callbackfunc) loop.run_forever()
除了使用 loop.run_until_complete
方法,还可以使用 asyncio.ensure_future()
方法来运行协程,将上面代码中的 task = loop.create_task(asyncfunc1())
改为 task = asyncio.ensure_future(asyncfunc1())
会得到相同的结果,它的参数是协程对象或者futures,也可以传task对象,因为task是futures的子类,当传入的是一个协程对象时,返回一个task对象,传入一个futures的时候,直接返回futures对象,也就是说,在调用 asyncio.ensure_future()
以后,都会返回一个task对象,都可以为它添加一个回调方法,并且可以调用task.result()方法得到结果(注意如果task没有执行结束就调用result方法,则会抛异常)。
多个协程任务的并行
最上面我准备了两个异步的函数asyncfunc1和asyncfunc2,如果我想要这两个函数同时执行,并且得到它们的返回值该怎么操作呢?
有了上面单协程的经验,我们也可以使用事件循环创建两个task,然后在run_forever()来执行,可以对task添加回调,将结果输出。
#coding:gbk import asyncio # 定义两个异步函数 async def asyncfunc1(): print("Suspending func1") await asyncio.sleep(1) print("func func1 ", threading.current_thread()) print('Resuming func1') return "func1" async def asyncfunc2(): print("Suspending func2") await asyncio.sleep(1) print("func func2 ", threading.current_thread()) print('Resuming func2') return "func2" # 定义一个回调函数 def callbackfunc(task): print("task 运行结束,它的结果是:",task.result()) if __name__=="__main__": print("In main thread ",threading.current_thread()) loop = asyncio.get_event_loop() task1 = loop.create_task(asyncfunc1()) task1.add_done_callback(callbackfunc) task2 = loop.create_task(asyncfunc2()) task2.add_done_callback(callbackfunc) loop.run_forever()
输出结果是
In main thread<_mainthread(mainthread, started="" 8040)="">
Suspending func1
Suspending func2
func func1<_mainthread(mainthread, started="" 8040)="">
Resuming func1
func func2<_mainthread(mainthread, started="" 8040)="">
Resuming func2
task 运行结束,它的结果是: func1
task 运行结束,它的结果是: func2
此时由于loop调用了run_forever方法,且没有方法调用stop方法,所以程序会一直卡着。
这样是可以将多个协程跑起来,但这样的处理一是繁琐,二是不方便结果的回收。
asyncio有一个gather方法,可以传入多个任务对象,当调用await asyncio.gather(*) 时,它会将结果全部返回
由于await 只能写在async def 函数中,所以这里还需要新创建一个函数
async def main(): task1 = loop.create_task(asyncfunc1()) task1.add_done_callback(callbackfunc) task2 = loop.create_task(asyncfunc2()) task2.add_done_callback(callbackfunc) result = await asyncio.gather(task1,task2) print(result) async def mian2(): result = await asyncio.gather(asyncfunc1(),asyncfunc2()) print(result)
两种定义方式都可以,一个是向gather函数传的是协程对象,一个是传的task对象。之后在调用
if __name__=="__main__": print("In main thread ",threading.current_thread()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) # or main2()
得到的输出为
In main thread<_mainthread(mainthread, started="" 7016)="">
Suspending func1
Suspending func2
func func1<_mainthread(mainthread, started="" 7016)="">
Resuming func1
func func2<_mainthread(mainthread, started="" 7016)="">
Resuming func2
task 运行结束,它的结果是: func1
task 运行结束,它的结果是: func2
[‘func1’, ‘func2’]
这样就达到的协程的并行与结果的回收。
这篇文章先简单介绍到这里,之后会继续分析同步代码的执行。