协同程序以 async/await 语法进行声明。
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
系统程序只能通过 asynico.run进行调用。直接调用是没有用的。协同程序。
>>> main()
<coroutine object main at 0x1053bb7c8>
三种方法运行协同程序。
1:
asyncio.run() 顶层调用 main(),如上案例
2:await一个协同程序。如下案例。await传递?
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) started at 17:13:52 hello world finished at 17:13:55
3:通过
asyncio.create_task()
方法去调用协同程序,参照Tasksasync def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") started at 17:14:32 hello world finished at 17:14:34
两个是异步,只需要大概两秒的时间。需要看一下输出耗时。
这种比上面的那两种都要高效一些。
所有可以通过await使用的表达式都叫做 awaitable对象。许多异步io接口都支持awaitables.
目前有三种 awaitable 对象。coroutines,Tasks,Futures.
coroutines
python的协同程序都是awaitables,因此可以在其他的程序中进行 await操作。
import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main()) ''' RuntimeWarning: coroutine 'nested' was never awaited nested() 42 '''
上面的nested是一个await函数,但是不会调用,因为只能通过上面的三种方式进行调用。只是说创建了这么一个对象。
然后通过await传递。在run里面,会执行await标记的awaitable对象。上面的只会在两种情况下使用:
coroutine方法:async def
coroutine对象:调用调用coroutine方法返回的对象。 即上面的 nested()
Tasks
Tasks将按照顺序异步执行这些协同程序。
当一个协同程序被
asyncio.create_task()
方法包装到Task中的时候。这个系统程序将会按照顺序进行异步执行。import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main())
Futures
这是一个特殊的底层的awaitable对象,代表着异步执行的最终结果。
当一个Future对象正在await状态,那么也就是说,协同程序将会一直等到Future在某个地方执行完。
所有在异步io中的Future对象都需要通过 async/await 声明的可回调代码块。
一般来说,基本都不需要写这样的代码。
在有些时候库和异步ioapi需要扩展的时候,可以使用,比如。
async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) asyncio.run(main())
Running an asyncio Program
如何运行一个异步程序。
asyncio.
run
(coro, *, debug=False)这个方法将调用传递过去的协同程序,小心的管理事件请求队列,并且异步的执行。
在当前线程执行已经有一个正在运行的情况下是不可以调用的。
如果debug参数被设定为True,那么事件请求队列将会以debug的模式运行。
这个方法会创建一个新的时间请求队列,并且在执行完毕的时候关闭。而且这个也是执行异步程序的入口。而且也只应该被调用一次。
Creating Tasks
asyncio.
create_task
(coro)将一个系统程序包装到执行列表中,并且返回Task对象。
task将会在get_running_loop()
,返回的loop队列中执行。如果当前现成中没有正在执行的loop队列,将会抛出异常。
Sleeping
coroutine
asyncio.
sleep
(delay, result=None, *, loop=None)阻塞delay秒
如果给了result参数。那么将会执行完成后将之返回给调用者。import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
协同程序只能通过await声明提到顶层,否则是不会执行的。所以如果不是用await,那么久仅仅是一个简单的创建对象。
只有通过调用,而且是通过run调用,loop才会提升到当前线程执行。
Running Tasks Concurrently
awaitable
asyncio.
gather
(*aws, loop=None, return_exceptions=False)同步执行所有aws中的awaitable对象。
如果aws里面所有都是协同程序。那么会像Task一样自动的执行。
如果所有的协同程序都成功执行,那么result就是一个所有方法的返回值。返回值得顺序对应aws中的顺序。
如果return_exceptions被设定为false(默认),那么第一个发生的一场将会传递给其他的系统程序。其他的协同
程序不会推出,仍然继续执行。
如果设置为True,异常将会被视为结果放在结果集里面。
如果gather退出,所有的系统程序都会退出。
如果Task或者Future从aws中推出,那么这两个抛出的异常将不会按照cancel信号抛出。这样是为了避免一个退出导致其他的退出 。import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: 期望输出 # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24