很多朋友对异步编程都处于“听说很强大”的认知状态。鲜有在生产项目中使用它。而使用它的同学,则大多数都停留在知道如何使用 Tornado、Twisted、Gevent 这类异步框架上,出现各种古怪的问题难以解决。而且使用了异步框架的部分同学,由于用法不对,感觉它并没牛逼到哪里去,所以很多同学做 Web 后端服务时还是采用 Flask、Django等传统的非异步框架。
从上两届 PyCon 技术大会看来,异步编程已经成了 Python 生态下一阶段的主旋律。如新兴的 Go、Rust、Elixir 等编程语言都将其支持异步和高并发作为主要“卖点”,技术变化趋势如此。Python 生态为不落人后,从2013年起由 Python 之父 Guido 亲自操刀主持了Tulip(asyncio)项目的开发。
异步io的好处在于避免的线程的开销和切换,而且我们都知道python其实是没有多线程的,只是通过底层线层锁实现的多线程。另一个好处在于避免io操作(包含网络传输)的堵塞时间。
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。
对于异步io你需要知道的重点,要注意的是,await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。
注意:
所有需要异步执行的函数,都需要asyncio中的轮训器去轮训执行,如果函数阻塞,轮训器就会去执行下一个函数。所以所有需要异步执行的函数都需要加入到这个轮训器中。
例如:
import requests
import time
import asyncio
# 创建一个异步函数
async def task_func():
await asyncio.sleep(1)
resp = requests.get('http://192.168.2.177:5002/')
print('2222222',time.time(),resp.text)
async def main(loop):
loop=asyncio.get_event_loop() # 获取全局轮训器
task = loop.create_task(task_func()) # 在全局轮训器加入协成,只有加入全局轮训器才能被监督执行
await asyncio.sleep(2) # 等待两秒为了不要立即执行event_loop.close(),项目中event_loop应该是永不停歇的
print('11111111111',time.time())
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close() # 当轮训器关闭以后,所有没有执行完成的协成将全部关闭
下面是aiohttp一个简单的demo。
#!/usr/bin/env python3
import argparse
from aiohttp import web
import asyncio
import base64
import logging
import uvloop
import time,datetime
import json
import requests
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
routes = web.RouteTableDef()
@routes.get('/')
async def hello(request):
return web.Response(text="Hello, world")
# 定义一个路由映射,接收网址参数,post方式
@routes.post('/demo1/{name}')
async def demo1(request): # 异步监听,只要一有握手就开始触发,此时网址参数中的name就已经知道了,但是前端可能还没有完全post完数据。
name = request.match_info.get('name', "Anonymous") # 获取name
print(datetime.datetime.now()) # 触发视图函数的时间
data = await request.post() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
print(datetime.datetime.now()) # 接收post数据完成的时间
logging.info('safety dect request start %s' % datetime.datetime.now())
result = {'name':name,'key':data['key']}
logging.info('safety dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))
return web.json_response(result)
# 定义一个路由映射,设计到io操作
@routes.post('/demo2')
async def demo2(request): # 异步监听,只要一有握手就开始触发,此时网址参数中的name就已经知道了,但是前端可能还没有完全post完数据。
data = await request.post() # 等待post数据完成接收,只有接收完成才能进行后续操作.data['key']获取参数
logging.info('safety dect request start %s' % datetime.datetime.now())
res = requests.post('http://www.baidu.com') # 网路id,会自动切换到其他协成上
logging.info('safety dect request finish %s' % res.test)
return web.Response(text="welcome")
if __name__ == '__main__':
logging.info('server start')
app = web.Application()
app.add_routes(routes)
web.run_app(app,host='0.0.0.0',port=8080)
logging.info('server close')