asyncio网络篇

协同程序以   async/await  语法进行声明。

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

系统程序只能通过 asynico.run进行调用。直接调用是没有用的。协同程序。

>>> main()
<coroutine object main at 0x1053bb7c8>

 三种方法运行协同程序。

1:asyncio.run() 顶层调用  main(),如上案例

2:await一个协同程序。如下案例。await传递?

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())


started at 17:13:52
hello
world
finished at 17:13:55

3:通过asyncio.create_task()方法去调用协同程序,参照Tasks

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")



started at 17:14:32
hello
world
finished at 17:14:34

两个是异步,只需要大概两秒的时间。需要看一下输出耗时。
这种比上面的那两种都要高效一些。

 所有可以通过await使用的表达式都叫做 awaitable对象。许多异步io接口都支持awaitables.
目前有三种 awaitable 对象。coroutines,Tasks,Futures.

coroutines

 python的协同程序都是awaitables,因此可以在其他的程序中进行 await操作。

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

'''
RuntimeWarning: coroutine 'nested' was never awaited
  nested()
42
'''

上面的nested是一个await函数,但是不会调用,因为只能通过上面的三种方式进行调用。只是说创建了这么一个对象。
然后通过await传递。在run里面,会执行await标记的awaitable对象。

上面的只会在两种情况下使用:
    coroutine方法:async def
    coroutine对象:调用调用coroutine方法返回的对象。  即上面的 nested()

link

Tasks

Tasks将按照顺序异步执行这些协同程序。

当一个协同程序被asyncio.create_task()方法包装到Task中的时候。这个系统程序将会按照顺序进行异步执行。

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

这是一个特殊的底层的awaitable对象,代表着异步执行的最终结果。

当一个Future对象正在await状态,那么也就是说,协同程序将会一直等到Future在某个地方执行完。

所有在异步io中的Future对象都需要通过  async/await 声明的可回调代码块。

一般来说,基本都不需要写这样的代码。

在有些时候库和异步ioapi需要扩展的时候,可以使用,比如。

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )
import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

Running an asyncio Program

如何运行一个异步程序。

asyncio.run(coro*debug=False)

这个方法将调用传递过去的协同程序,小心的管理事件请求队列,并且异步的执行。

在当前线程执行已经有一个正在运行的情况下是不可以调用的。

如果debug参数被设定为True,那么事件请求队列将会以debug的模式运行。

这个方法会创建一个新的时间请求队列,并且在执行完毕的时候关闭。而且这个也是执行异步程序的入口。而且也只应该被调用一次。

Creating Tasks

asyncio.create_task(coro)

   将一个系统程序包装到执行列表中,并且返回Task对象。
   task将会在get_running_loop(),返回的loop队列中执行。如果当前现成中没有正在执行的loop队列,将会抛出异常。

Sleeping

coroutine asyncio.sleep(delayresult=None*loop=None)

阻塞delay秒
如果给了result参数。那么将会执行完成后将之返回给调用者。

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

协同程序只能通过await声明提到顶层,否则是不会执行的。所以如果不是用await,那么久仅仅是一个简单的创建对象。
只有通过调用,而且是通过run调用,loop才会提升到当前线程执行。

Running Tasks Concurrently

awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

同步执行所有aws中的awaitable对象。
如果aws里面所有都是协同程序。那么会像Task一样自动的执行。
如果所有的协同程序都成功执行,那么result就是一个所有方法的返回值。返回值得顺序对应aws中的顺序。
如果return_exceptions被设定为false(默认),那么第一个发生的一场将会传递给其他的系统程序。其他的协同
程序不会推出,仍然继续执行。
如果设置为True,异常将会被视为结果放在结果集里面。
如果gather退出,所有的系统程序都会退出。
如果Task或者Future从aws中推出,那么这两个抛出的异常将不会按照cancel信号抛出。这样是为了避免一个退出导致其他的退出 。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())


# Expected output: 期望输出
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

猜你喜欢

转载自blog.csdn.net/rubikchen/article/details/86299859