目录
WebsocketHandler
- 一个用来处理Websocket Messages和生命周期事件的处理程序。
在Spring中,如果我们仅仅使用 Websocket 而非 STOMP,正如官方文档所说:
“创建WebSocket服务器就像实现WebSocketHandler一样简单,或者更可能继承TextWebSocketHandler或BinaryWebSocketHandler”
只需要去实现并配置自己的 WebsocketHandler,如下所示:
TextWebSocketHandler实现类:
public class MyHandler extends TextWebSocketHandler {
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}
}
SubProtocolWebSocketHandler 也是 WebSocketHandler 的一种实现,就像它的名字那样,是一种专门处理子协议(内容交换协议)的 WebSocketHandler 实现。
通过这个类,Spring 将STOMP over Websocket完美的封装起来,以至于我们完全不用去理会如何处理Websocket子协议?
,仅仅要做的就是启用消息代理,然后实现自己的业务逻辑。
SubProtocolWebSocketHandler
- 首先它是WebSocketHandler的一种实现,它将传入的WebSocketMessage(已经被成功升级成Websocket的消息)委托给SubProtocolHandler以及clientInboundChannel,SubProtocolHandler可以将Upgrade后的消息从WebSocket客户端发送到应用程序;
- 另外它也是MessageHandler的一种实现,它找到与Message关联的WebSocketSession,并将该session与Message一起传递给SubProtocolHandler,用来将Message从应用程序发送回客户端。
持有四个重要成员变量:
- Map<String, SubProtocolHandler> protocolHandlerLookup =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - Map<String, WebSocketSessionHolder> sessions = new ConcurrentHashMap<>();
- MessageChannel clientInboundChannel;
- SubscribableChannel clientOutboundChannel;
四个重要成员变量
protocolHandlerLookup
“协议处理程序查找” —— 顾名思义,这个变量可以持有不止一个SubProtocolHandler实例,通过它我们能够查找到相应子协议的处理程序。该变量的实际类型是new TreeMap(),为了保证元素顺序。key —— 为子协议的名称,value —— 为 SubProtocolHandler 实例。
下面这段代码,可以看到如何向protocolHandlerLookup添加SubProtocolHandler实例。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
/**
* Register a sub-protocol handler.
*/
public void addProtocolHandler(SubProtocolHandler handler) {
List<String> protocols = handler.getSupportedProtocols();
if (CollectionUtils.isEmpty(protocols)) {
if (logger.isErrorEnabled()) {
logger.error("No sub-protocols for " + handler);
}
return;
}
for (String protocol : protocols) {
SubProtocolHandler replaced = this.protocolHandlerLookup.put(protocol, handler);
if (replaced != null && replaced != handler) {
throw new IllegalStateException("Cannot map " + handler +
" to protocol '" + protocol + "': already mapped to " + replaced + ".");
}
}
this.protocolHandlers.add(handler);
}
}
子协议
- 处理WebSocket消息的约定,作为更高级别协议的一部分,在WebSocket RFC规范中称为“子协议”
SubProtocolHandler
- 实际上SubProtocolWebSocketHandler所做的工作,最终都是通过SubProtocolHandler来完成,可以说后者就是前者的支撑实例。
- 处理来自客户端的WebSocketMessages以及要发送给客户端的Message。
- 可以在SubProtocolWebSocketHandler上配置此接口的实现,SubProtocolWebSocketHandler根据客户端传递的Sec-WebSocket-Protocol请求标头,来选择子协议处理程序以委派消息。
通过名字和注释我们很容易理解各个方法的作用,不过还是要强调两个:
1. handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
将WebsocketMessage从客户端发送到应用程序。不过这里需要强调两点:
- 是WebsocketMessage。也就是说 HandeShake 握手请求成功,请求消息升级成WebsocketMessage后,该处理程序才会被调用,同样其他方法中所强调的连接,此时即是Websocket连接。
- 发送到应用程序。所以这个方法的注释和参数名,其实是错的。这里参数MessageChannel,应该是
inboundChannel
——a inbound channel to send messages to application
,是一个发送消息到应用程序的入栈通道。
2. handleMessageToClient(WebSocketSession session, Message<?> message)
将消息从应用程序发送到客户端。
SubProtocolHandler :
public interface SubProtocolHandler {
/**
* Return the list of sub-protocols supported by this handler (never {@code null}).
*/
List<String> getSupportedProtocols();
/**
* Handle the given {@link WebSocketMessage} received from a client.
* @param session the client session
* @param message the client message
* @param outputChannel an output channel to send messages to
*/
void handleMessageFromClient(WebSocketSession session, WebSocketMessage<?> message, MessageChannel outputChannel)
throws Exception;
/**
* Handle the given {@link Message} to the client associated with the given WebSocket session.
* @param session the client session
* @param message the client message
*/
void handleMessageToClient(WebSocketSession session, Message<?> message) throws Exception;
/**
* Resolve the session id from the given message or return {@code null}.
* @param message the message to resolve the session id from
*/
@Nullable
String resolveSessionId(Message<?> message);
/**
* Invoked after a {@link WebSocketSession} has started.
* @param session the client session
* @param outputChannel a channel
*/
void afterSessionStarted(WebSocketSession session, MessageChannel outputChannel) throws Exception;
/**
* Invoked after a {@link WebSocketSession} has ended.
* @param session the client session
* @param closeStatus the reason why the session was closed
* @param outputChannel a channel
*/
void afterSessionEnded(WebSocketSession session, CloseStatus closeStatus, MessageChannel outputChannel)
throws Exception;
}
在Spring中(截止当前Spring Framework 5.1.2
)StompSubProtocolHandler—— 处理STOMP子协议
目前是SubProtocolHandler 的唯一实现,但是该接口为以后子协议的扩展提供了钩子(简直惊叹啊,这是编程者值得学习的地方)。
sessions
“会话集合” —— 维持着与该应用程序建立Websocket连接的所有WebsoketSession。这里实际类型是new ConcurrentHashMap<>(),满足应用程序的并发量,key —— WebsocketSession id,value —— WebSocketSessionHolder实例。
afterConnectionEstablished(WebSocketSession session):
在Websocket 连接建立之后,将WebSocketSession 存入sessions。我们可以通过复写该方法来实现session存储的自定义。
当然Spring早叫为我们想到了,所以它提供了WebSocketHandlerDecorator 与WebSocketHandlerDecoratorFactory,作为一个钩子,允许我们不改变代码的情况下,为SubProtocolWebSocketHandler增加功能,比如说“实现分布式WebsocketSession”
,以此来满足我们构建分布式应用程序的需求。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// WebSocketHandlerDecorator could close the session
if (!session.isOpen()) {
return;
}
this.stats.incrementSessionCount(session);
session = decorateSession(session);
this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
}
}
clientInboundChannel
“入站通道” —— SubProtocolWebSocketHandler实现了WebSocketHandler的handleMessage(WebSocketSession session, WebSocketMessage<?> message) 方法,处理升级成功WebsocketMessage。
先通过sessionId从sessions中取出对应的WebSocketSessionHolder,再通过protocol子协议类型从protocolHandlerLookup取出对应的SubProtocolHandler ,再由SubProtocolHandler通过StompDecoder——STOMP消息解码器,将ByteBuffer解码成STOMP格式的消息内容,然后对该内容增加更多的信息,最后由clientInboundChannel发送到应用程序。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
/**
* Handle an inbound message from a WebSocket client.
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
WebSocketSessionHolder holder = this.sessions.get(session.getId());
if (holder != null) {
session = holder.getSession();
}
SubProtocolHandler protocolHandler = findProtocolHandler(session);
protocolHandler.handleMessageFromClient(session, message, this.clientInboundChannel);
if (holder != null) {
holder.setHasHandledMessages();
}
checkSessions();
}
}
clientOutboundChannel
“出站通道” —— SubProtocolWebSocketHandler的bean在生命周期开始时,订阅了它的成员变量clientOutboundChannel,并且SubProtocolWebSocketHandler是MessageHandler的实现,以至于通过clientOutboundChannel发送的信息,会被该处理程序接收,通过handleMessage(Message<?> message)方法进行处理。
先通过sessionId获得对应的WebSocketSession,再由对应的SubProtocolHandler调用handleMessageToClient(WebSocketSession session, Message<?> message)对Message进行附加内容,之后通过StompEcoder——STOMP消息编码器,将STOMP格式的消息内容编码成字节数组,最后由WebSocketSession发送到对应客户端。
SubProtocolWebSocketHandler:
public class SubProtocolWebSocketHandler
implements WebSocketHandler, SubProtocolCapable, MessageHandler, SmartLifecycle {
@Override
public final void start() {
Assert.isTrue(this.defaultProtocolHandler != null || !this.protocolHandlers.isEmpty(), "No handlers");
synchronized (this.lifecycleMonitor) {
this.clientOutboundChannel.subscribe(this);
this.running = true;
}
}
/**
* Handle an outbound Spring Message to a WebSocket client.
*/
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String sessionId = resolveSessionId(message);
if (sessionId == null) {
if (logger.isErrorEnabled()) {
logger.error("Could not find session id in " + message);
}
return;
}
WebSocketSessionHolder holder = this.sessions.get(sessionId);
if (holder == null) {
if (logger.isDebugEnabled()) {
// The broker may not have removed the session yet
logger.debug("No session for " + message);
}
return;
}
WebSocketSession session = holder.getSession();
try {
findProtocolHandler(session).handleMessageToClient(session, message);
}
catch (SessionLimitExceededException ex) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Terminating '" + session + "'", ex);
}
this.stats.incrementLimitExceededCount();
clearSession(session, ex.getStatus()); // clear first, session may be unresponsive
session.close(ex.getStatus());
}
catch (Exception secondException) {
logger.debug("Failure while closing session " + sessionId + ".", secondException);
}
}
catch (Exception ex) {
// Could be part of normal workflow (e.g. browser tab closed)
if (logger.isDebugEnabled()) {
logger.debug("Failed to send message to client in " + session + ": " + message, ex);
}
}
}
}