websocket多线程发送内容

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/woaiqianzhige/article/details/87932558

websocket多线程发送内容

1.websocketSession基类接口:

org.springframework.web.socket.WebSocketSession
其中包含 getPrincipal,getLocalAddress,getRemoteAddress,sendMessage,isOpen,close等方法定义

2.接口的实现类,实现了上面的接口,采用包装设计模式,只做转发没有处理逻辑,具体看下面这个类:

org.springframework.web.socket.handler.WebSocketSessionDecorator

3.带并发的包装类:

ConcurrentWebSocketSessionDecorator
主要关注两个方法	sendMessage 和 close,

```
public void close(CloseStatus status) throws IOException {
    this.closeLock.lock(); //获取关闭锁

    try {
        if (!this.closeInProgress) {  //如果已经关闭了,则不处理
            if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
                try {
                    this.checkSessionLimits(); //检查超时时间和buffer状态
                } catch (SessionLimitExceededException var6) {
                    ;
                }

                if (this.limitExceeded) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE.");
                    }

                    status = CloseStatus.SESSION_NOT_RELIABLE;
                }
            }

            this.closeInProgress = true; //设置关闭标志位,防止多次关闭
            super.close(status); //调用关闭方法,传递关闭状态
            return;
        }
    } finally {
        this.closeLock.unlock();
    }

}


```

close方法是用 volatile 修饰的标志位 closeInProgress和加显式锁的方式, 防止多个线程调用close方法

4.sendMessage 方法

    public void sendMessage(WebSocketMessage<?> message) throws IOException {
    if (!this.shouldNotSend()) { //检查允许发送(是否是关闭状态?,是否是超过大小限制?)
        this.buffer.add(message); //将消息添加到queue中
        this.bufferSize.addAndGet(message.getPayloadLength()); //将消息大小字段递增

        do {
            if (!this.tryFlushMessageBuffer()) { //尝试发消息,需要加锁,如果有没有获取到锁,则返回false
                if (logger.isTraceEnabled()) {
                    logger.trace(String.format("Another send already in progress: session id '%s':, \"in-progress\" send time %d (ms), buffer size %d bytes", this.getId(), this.getTimeSinceSendStarted(), this.getBufferSize()));
                }

                this.checkSessionLimits(); //检查是否超时,检查大小是否超过限制
                break;
            }
        } while(!this.buffer.isEmpty() && !this.shouldNotSend());

    }
}

真实发送消息的方法,在while方法执行时,如果其他线程也要调用发送方法,则只对buffer大小进行统计,不发送
private boolean tryFlushMessageBuffer() throws IOException {
    if (!this.flushLock.tryLock()) { //没获取到发送锁,直接返回false
        return false;
    } else {
        try {
            while(true) { //循环遍历queue,知道里面取出来的值为null
                WebSocketMessage<?> message = (WebSocketMessage)this.buffer.poll();
                if (message == null || this.shouldNotSend()) {
                    return true;
                }

                this.bufferSize.addAndGet(-message.getPayloadLength()); //减掉将要发送内容的大小
                this.sendStartTime = System.currentTimeMillis(); //记录开始时间
                this.getDelegate().sendMessage(message); //调用真实发送方法
                this.sendStartTime = 0L; //发送完成,重置发送开始时间
            }
        } finally {
            this.sendStartTime = 0L;
            this.flushLock.unlock();
        }
    }
}

spring就是由上面的包装类实现了websocket session的多线程使用,可以将websocketSession保存到全局map中,随时调用

猜你喜欢

转载自blog.csdn.net/woaiqianzhige/article/details/87932558