版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/woaiqianzhige/article/details/87932558
websocket多线程发送内容
1.websocketSession基类接口:
org.springframework.web.socket.WebSocketSession
其中包含 getPrincipal,getLocalAddress,getRemoteAddress,sendMessage,isOpen,close等方法定义
2.接口的实现类,实现了上面的接口,采用包装设计模式,只做转发没有处理逻辑,具体看下面这个类:
org.springframework.web.socket.handler.WebSocketSessionDecorator
3.带并发的包装类:
ConcurrentWebSocketSessionDecorator
主要关注两个方法 sendMessage 和 close,
```
public void close(CloseStatus status) throws IOException {
this.closeLock.lock(); //获取关闭锁
try {
if (!this.closeInProgress) { //如果已经关闭了,则不处理
if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
try {
this.checkSessionLimits(); //检查超时时间和buffer状态
} catch (SessionLimitExceededException var6) {
;
}
if (this.limitExceeded) {
if (logger.isDebugEnabled()) {
logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE.");
}
status = CloseStatus.SESSION_NOT_RELIABLE;
}
}
this.closeInProgress = true; //设置关闭标志位,防止多次关闭
super.close(status); //调用关闭方法,传递关闭状态
return;
}
} finally {
this.closeLock.unlock();
}
}
```
close方法是用 volatile 修饰的标志位 closeInProgress和加显式锁的方式, 防止多个线程调用close方法
4.sendMessage 方法
public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (!this.shouldNotSend()) { //检查允许发送(是否是关闭状态?,是否是超过大小限制?)
this.buffer.add(message); //将消息添加到queue中
this.bufferSize.addAndGet(message.getPayloadLength()); //将消息大小字段递增
do {
if (!this.tryFlushMessageBuffer()) { //尝试发消息,需要加锁,如果有没有获取到锁,则返回false
if (logger.isTraceEnabled()) {
logger.trace(String.format("Another send already in progress: session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes", this.getId(), this.getTimeSinceSendStarted(), this.getBufferSize()));
}
this.checkSessionLimits(); //检查是否超时,检查大小是否超过限制
break;
}
} while(!this.buffer.isEmpty() && !this.shouldNotSend());
}
}
真实发送消息的方法,在while方法执行时,如果其他线程也要调用发送方法,则只对buffer大小进行统计,不发送
private boolean tryFlushMessageBuffer() throws IOException {
if (!this.flushLock.tryLock()) { //没获取到发送锁,直接返回false
return false;
} else {
try {
while(true) { //循环遍历queue,知道里面取出来的值为null
WebSocketMessage<?> message = (WebSocketMessage)this.buffer.poll();
if (message == null || this.shouldNotSend()) {
return true;
}
this.bufferSize.addAndGet(-message.getPayloadLength()); //减掉将要发送内容的大小
this.sendStartTime = System.currentTimeMillis(); //记录开始时间
this.getDelegate().sendMessage(message); //调用真实发送方法
this.sendStartTime = 0L; //发送完成,重置发送开始时间
}
} finally {
this.sendStartTime = 0L;
this.flushLock.unlock();
}
}
}