

  • 1.1请求流媒体


from sanic import Sanic
from sanic.views import CompositionView  # 合成视图
from sanic.views import HTTPMethodView  # http协议方法视图
from sanic.views import stream as stream_decorator  # 流媒体装饰
from sanic.blueprints import Blueprint  # 蓝图
from sanic.response import stream, text

bq = Blueprint("blueprint_request_stream")  # 实例化蓝图对象(把蓝图请求流传到蓝图对象中)
# 实例化一个Sanic对象
app = Sanic("request_stream")  # 把请求媒体传到Sanic对象

class SimpleView(HTTPMethodView):  # 简单的视图,继承HTTPMethodView这个类
    # 流媒体装饰器
    # 异步提交强求的处理函数
    async def post(self, request):
        # 当以一个空的result
        result = ""
        while 1:
            # 从请求的流媒体中读取请求体
            body = await # 判断读取的请求体是空(无值) if body is None: break # 停止循环 print(body, type(body)) result += body.decode("utf-8") # 读取到的body是一个bytes"/stream", stream=True) # 提交请求的路由 async def handler(request): async def streaming(response): while 1: # 从请求的stream中读取请求体 body = await # 如果body是空值 if body is None: break # 停止循环 body = body.decode("utf-8").replace("1", "A") # 把body中的字符串1,替换成A await response.write(body) # 阻塞的把请求体写入到响应的response中 return stream(streaming) # 调用streaming异步函数 # put请求,修改数据 @app.put("/bq_stream", stream=True) async def bq_put_handler(request): result = "" while 1: body = await if body is None: break print(body, type(body)) result += body.decode("utf-8").replace("1", "A") return text("result") # 你可以在流媒体参数中使用bq_add_route()方法 async def bq_post_handler(request): result = "" # 定义一个空的result while 1: body = await if body is None: break result += body.decode("utf-8").replace("1", "A") return text(result) # 返回请求体中获取到的内容  bq.add_route(bq_post_handler, "/bq_stream", methods=["POST"], stream=True) # 我自己测试的add_route没有stream=None这个关键字 # bp.add_route(bp_post_handler, '/bp_stream', methods=['POST'], stream=True) # 异步处理post请求的方法 async def post_handler(request): result = "" while 1: body = await if not body: break result += body.decode("utf-8") return text(result) # 此时的result是个字符串  app.blueprint(bq) # 把实例化的蓝图注册进来 app.add_route(SimpleView.as_view(), "/method_view") view = CompositionView() # 实例化合成视图对象 view.add(["POST"], post_handler, stream=True) app.add_route(view, "/composition_view") if __name__ == '__main__':"", port=8000, debug=True)

  • 1.2响应流媒体


from sanic import Sanic
from sanic.response import stream  # 这个流媒体存在于sanic的响应中

# 实例化一个Sanic对象
app = Sanic(__name__)

async def test(request):
    async def sample_streaming_fn(response):
        await response.write("foo, await response.write("bar") return stream(sample_streaming_fn, content_type="text/csv")


from sanic import Sanic
from sanic.response import stream
import asyncpg

# 实例化一个Sanic对象
app = Sanic()

async def index(request): async def stream_from_db(response): # 创建一个连接异步的数据库的对象 conn = await asyncpg.connect(user="root", password=123456, database="xuexue") # conn事物的处理业务逻辑  async with conn.transaction(): # 异步的执行命令 async for record in conn.cursor("SELECT generate_series(0, 10)"): # 非阻塞的记录  await response.write(record[0]) return stream(stream_from_db)





  • 2.1定义视图

基于类的视图需要继承子类HTTPMethodView,你可以实施类方法为你想要支持的每一个HTTP请求的类型,如果后端接收到了一个没有定义方法的请求,后端将会返回一个405:Method not allowed.



from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text
# 实例化一个Sanix对象
app = Sanic("some_name")

class SimpleView(HTTPMethodView):
    # get请求的视图方法
    def get(self, request): # 接收到请求先校验 # 从数据库中获取数据 # 将获取到的数据展示出来 return text(" i am get method") # post请求提交数据的方法 def post(self, request): # 接收到请求先校验 # 把提交的数据跟新到数据库和中 # 将提交后的数据展示出来 return text("i am post method") # put更新的方法(更新全部资源) def put(self, request): # # 接收到请求先校验 # # 从数据库中更新数据 # # 将跟新后的数据展示出来 return text("i am put method") # patch更新的方法(局部资源) def patch(self, request): # 接收到请求先校验 # 从数据库中更新数据 # 将跟新后的数据展示出来 return text("i am patch method") # delete删除的方法 def delete(self, request): # 接收到请求可以进行一系列的操作 return text("i am delete method")


from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text

# 实例化一个Sanic对象
app = Sanic("some_name")

class SimpleView(HTTPMethodView):
    async def get(self, request): # 拿到请求后,进行一系列操作 return text("i am async get method") # 注册类视图 app.add_route(SimpleView.as_view(), "/") # 要写路由"/"
  • 2.2,URL参数


from sanic.response import text
from sanic.views import HTTPMethodView
from sanic import Sanic

# 实例化一个Sanic对象
app = Sanic("some_name")

# 基于类的视图
class NameView(HTTPMethodView):
    # 处理get请求的方法
    def get(self, request, name): # 拿到请求的参数可以做一些列的操作 return text("hello {}".format(name)) # 注册类视图 app.add_route(NameView.as_view(), "/<name>")
  • 2.3装饰器


from sanic import Sanic
from sanic.response import text
from sanic.views import HTTPMethodView

# 实例化一个Sanic对象
app = Sanic("some_name")

class ViewWithDecorator(HTTPMethodView):
    # 把所有的装饰器写在一个列表中
    decorators = ['some_decorator_here'] # 处理get请求 def get(self, request, name): return text("Hello I have a decorator") # 处理post请求到的方法 def post(self, request, name): return text(" Hello I also have a decorator") # 注册类视图 app.add_route(ViewWithDecorator.as_view(), "/url")


from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text

# 实例化一个Sanic对象
app = Sanic("some_name")

class ViewWithSomeDecorator(HTTPMethodView):
    @staticmethod  # 静态方法的装饰器
    @some_decorator_here  # 其他的装饰器
    def get(request, name): return text(" Hello I have a decorator") def post(self, request, name): return text("Hello I don`t have any decorators")
  • 2.4网址构建

如果你希望为 HTTPMethodView构建一个网址,切记,你将会通过url_for传递类名的端点.

from sanic import Sanic
from sanic.response import text, redirect
from sanic.views import HTTPMethodView

# 实例化一个Sanic对象
app = Sanic("some_name")

def index(request): # 构架url地址 url = app.url_for("SpecialClassView") return redirect(url) # 当访问"/"这个路由的时候,跳转到url构建的路径去 class SpecialClassView(HTTPMethodView): def get(self, request): return text(" Hello from the Special Class View!") # 注册类视图 app.add_route(SpecialClassView.as_view(), "/special_class_view")
  • 2.5使用合成视图



from sanic import Sanic
from sanic.views import CompositionView  # 合成视图
from sanic.response import text

# 实例化一个Sanic对象
app = Sanic("some_name")

def get_handler(request):
    return text("I am a get method")

view = CompositionView()  # 实例化一个合成视图的对象
view.add(["GET"], get_handler)  # 调用CompositionView中的add方法,传入请求的方式,和要执行的视图函数
view.add(["POST", "PUT"], lambda request: text("I am post/put method"))  # 增加post和put处理的方法
# 使用新的视图去处理基本路由的请求
app.add_route(view, "/")

