consumers允许使用者更轻松地编写ASGI应用。主要实现如下功能:
将代码结构搭建为针对事件的函数,而不是一整套事件循环;
通过同步或异步的代码处理线程切换。
consumer必须是channels.consumer.AsyncConsumer
或channels.consumer.SyncConsumer
的子类,用于编写异步或同步的代码。
一个简单的同步consumer示例:
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
def websocket_connect(self, event):
self.send({
"type": "websocket.accept",
})
def websocket_receive(self, event):
self.send({
"type": "websocket.send",
"text": event["text"],
})
上面定义了一个WebSocket回显服务器——他会接收所有连接进来的WebSocket连接,并回复相同的内容。
consumer围绕着一系列命名的方法构建起来,这些方法与他们要接受的信息的type值相对应,这些方法名称中的’.‘由’_'替换。
ASGI WebSocket规范指导Channels的WebSocket的搭建,并通过路由检查一个websocket的scope的类型,由此确定了接受的event类型以及以及event中含有的键值参数。
self.send(event)接口用于向客户端或协议服务器返回event。
异步consumer的搭建与同步类似,时是内部的方法定义要改为异步协程:
from channels.consumer import AsyncConsumer
class EchoConsumer(AsyncConsumer):
async def websocket_connect(self, event):
await self.send({
"type": "websocket.accept",
})
async def websocket_receive(self, event):
await self.send({
"type": "websocket.send",
"text": event["text"],
})
默认推荐使用同步consumer,尤其是调用Django ORM或其他同步程序时,以保持整个consumer在单个线程中并避免ORM查询阻塞整个event。
在异步consumer中调用同步consumer需要asgiref.sync.sync_to_async
,该组件用于在线程池中运行同步consumer,将同步consumer作为异步协同程序调用。
在异步consumer中调用Django ORM,可以使用database_sync_to_async
。
关闭consumer
当连接到consumer的链接关闭时,服务器会收到一个相应的event(比如,http.disconnect或websocket.disconnect),应用接受后需要作相应的处理。处理完成后,应当触发channels.exceptions.StopConsumer
以彻底中止ASGI应用。如果不出发并任由应用运行,则服务器会在达到应用关闭时限后(Daphne默认10秒),结束应用并触发警告。
通用型consumer会主动完成以上操作,只有基于AsyncConsumer
或``SyncConsumer````自定义consumer时需要注意这些。
通信层
consumer可以通过通信层实现一对一的信息交换或通过groups广播到整个系统。cosumer会调用default
通信层,也可以通过channel_layer_alias
参数自定义通信层名称:
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
channel_layer_alias = "echo_alias"
Scope
consumer在初始化时会接受链接的scope,类似于Django中的request对象,可以通过self.scope
查看。
scope也是ASGI规范的一部分,有如下常用信息:
scope["path"]
,请求的路径;
scope["headers"]
,请求头信息,以键值对的形式返回;
scope["method"]
,请求的方法(仅HTTP可用)
通用型consumer(GENERIC CONSUMERS)
Channel将常用的函数封装为generic views,类似于Django中的generic.view
WebsocketConsumer
通过channels.generic.websocket.WebsocketConsumer
使用,WebsocketConsumer将原始的ASGI消息的收发进行封装,用户仅需处理简单的文本或二进制字符:
from channels.generic.websocket import WebsocketConsumer
class MyConsumer(WebsocketConsumer):
groups = ["broadcast"]
def connect(self):
# 在收到连接时调用
# 接受连接:
self.accept()
# 也可以在接受连接的同时指定子协议
# 通过self.scope['subprotocols']访问由客户端指定的子协议列表
self.accept("subprotocol")
# 拒绝连接则调用:
self.close()
def receive(self, text_data=None, bytes_data=None):
# 调用时需要传入文本或二进制文本
# 正常调用:
self.send(text_data="Hello world!")
# 发送二进制框架:
self.send(bytes_data="Hello world!")
# 想要强制关闭连接则调用:
self.close()
# 或添加自定义的WebSocket的错误码:
self.close(code=4123)
def disconnect(self, close_code):
# 套接字关闭时调用
可以在connect
方法中触发channels.exceptions.AcceptConnection
或channels.exceptions.DenyConnection
以接受或拒绝一个连接。
如果在类属性groups
中定义了群组,那么WebSocket的频道会被自动加入或移出这些群组。group
属性必须是可迭代对象,同时必须设置支持群组的通信层(channels.layers.InMemoryChannelLayer
或channels_redis.core.RedisChannelLayer
)作为channel backend。
AsyncWebsocketConsumer
通过channels.generic.websocket.AsyncWebsocketConsumer
,其功能和特征与WebSocketConsume完全一样,只是异步实现:
from channels.generic.websocket import AsyncWebsocketConsumer
class MyConsumer(AsyncWebsocketConsumer):
groups = ["broadcast"]
async def connect(self):
# Called on connection.
# To accept the connection call:
await self.accept()
# Or accept the connection and specify a chosen subprotocol.
# A list of subprotocols specified by the connecting client
# will be available in self.scope['subprotocols']
await self.accept("subprotocol")
# To reject the connection, call:
await self.close()
async def receive(self, text_data=None, bytes_data=None):
# Called with either text_data or bytes_data for each frame
# You can call:
await self.send(text_data="Hello world!")
# Or, to send a binary frame:
await self.send(bytes_data="Hello world!")
# Want to force-close the connection? Call:
await self.close()
# Or add a custom WebSocket error code!
await self.close(code=4123)
async def disconnect(self, close_code):
# Called when the socket closes
JsonWebsocketConsumer
通过channels.generic.websocket.JsonWebsocketConsumer
使用,这个consumer会自动对WebSocket文本框进行JSON编解码。
receive_json
方法接受唯一的参数content
,该方法会解码JSON对象;
send_json
方法同样接受唯一参数content
,用于编码JSON对象。
也可以重写encode_json
和decode_json
类方法自定义JSON的编解码过程。
AsyncJsonWebsocketConsumer
异步版本的JsonWebsocketConsumer,通过channels.generic.websocket.AsyncJsonWebsocketConsumer
使用,其中endoce_json
和decode_json
都是异步函数。
AsyncHttpConsumer
通过channels.generic.http.AsyncHttpConsumer
使用,提供了基础的HTTP处理工具:
from channels.generic.http import AsyncHttpConsumer
class BasicHttpConsumer(AsyncHttpConsumer):
async def handle(self, body):
await asyncio.sleep(10)
await self.send_response(200, b"Your response bytes", headers=[
(b"Content-Type", b"text/plain"),
])
handle
方法接受二进制形式的完整的请求体,请求头以元组组成的列表或字典形式传递。应答内容同样需要二进制形式。
也可以通过send_headers
和send_body
方法分别定义应答内容。
import json
from channels.generic.http import AsyncHttpConsumer
class LongPollConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
(b"Content-Type", b"application/json"),
])
# Headers are only sent after the first body event.
# Set "more_body" to tell the interface server to not
# finish the response yet:
await self.send_body(b"", more_body=True)
async def chat_message(self, event):
# Send JSON and finish the response:
await self.send_body(json.dumps(event).encode("utf-8"))
也可以通过这个类定义ServerSendEvents:
from datetime import datetime
from channels.generic.http import AsyncHttpConsumer
class ServerSentEventsConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
(b"Cache-Control", b"no-cache"),
(b"Content-Type", b"text/event-stream"),
(b"Transfer-Encoding", b"chunked"),
])
while True:
payload = "data: %s\n\n" % datetime.now().isoformat()
await self.send_body(payload.encode("utf-8"), more_body=True)
await asyncio.sleep(1)