一 什么是WebSocket
1.1 首先分清几个概念
-
http协议:**超文本传输协议,属于应用层。它的任务是与服务器交换信息。至于怎么连接到服务器,怎么保证数据正确,http不管。
-
TCP协议:**传输控制协议,属于传输层。任务是保证连接的可靠,包括防止丢失,出错。所以在初次连接的时候进行3次握手,断开连接时进行4次挥手。至于连接上以后具体传送什么数据,tcp不管。
PS:别的应用层协议也能通过tcp进行,那么这协议在底层也进行三次握手。
1.2 混淆点
WebSocket:基于TCP的,运行在应用层,替代http的一个协议。
网上说的WebSocket只有一次握手,指的是:客户端发送一个http请求到服务器,服务器响应后标志这个连接建立起来。而不是指TCP的三次握手。
1.3 优点
- 节约宽带。轮询服务端数据的方式,使用的是http协议,head的信息很大,有效数据占比低,而使用WebSocket,头信息很小,有效数据占比高。
- 无浪费。轮询方法可能轮询10次,才可能碰到服务端数据更新,那么前9次数据都浪费了。而WebSocket是由服务器主动发回,来的都是新数据。
- 实时性。当服务器完成协议升级后(HTTP->Websocket),服务端可以主动向客户端推送信息,省去了客户端发起请求的步骤,同时没有间隔时间,只要服务端内容有变化,就可以告知客户端。实时性大大提高。
二 Socket和WebSocket的区别
- 本质上:
Socket本身不是一个协议,而是一个调用接口(API),它工作在OSI模型中的会话层(第5层),是对TCP/IP协议的封装。websocket运行在应用层,是http升级的一个协议。 - 连接:
Socket连接需要一对套接字,一个运行于客户端,另一个运行于服务端。连接分为三个步骤:服务器监听,客户端请求,连接确认。
websocket在客户端,发送一个http请求到服务器,当服务器响应后,完成协议升级,连接建立。
三 WebSocket服务端搭建
3.1 导入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
3.2 搭建websocket服务
- WebSocketConfig
package com.wyq.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @ClassName WebSocketConfig
* @Description: //TODO websocket配置类
* @Author ssm
* @Date 2022/9/23 17:39
*/
@Configuration
public class WebSocketConfig {
/**
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
- WebSocketServer
用于接收客户端的webSocket请求,处理主要逻辑。代码如下:
@ServerEndpoint注解中写上客户端连接的地址。
package io.freeyou.socket.web.server;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.freeyou.modules.sdwan.entity.WebsocketVo;
import io.freeyou.modules.sdwan.vo.SysUserVo;
import io.freeyou.modules.sys.service.SysUserTokenService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
/**
* @Description: WebSocket
* @Author: shishumin
* @Date: 2022/5/16 5:27 PM
* @Parameters:
* @Return
*/
@Component
@ServerEndpoint("/webSocket/{token}")
@Slf4j
@Getter
public class WebSocket {
/**
* 指定用户的Session对象
*/
private static Map<String, CopyOnWriteArraySet<WebSocket>> userSocketMap = new ConcurrentHashMap<>();
private String userId;
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 注入静态变量,根据token获取用户信息
*/
public static SysUserTokenService sysUserTokenService;
@OnOpen
public void onOpen(Session session, @PathParam(value = "token") String token) {
SysUserVo tokenEntity = sysUserTokenService.selectByToken(token);
if (tokenEntity == null) {
return;
}
this.userId = tokenEntity.getUserId();
// 设置发送时连接突然断开后的超时时间,默认是20s
session.getUserProperties().put( "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT", 1000L);
this.session = session;
if (!exitUser(userId)) {
initUserInfo(userId);
} else {
CopyOnWriteArraySet<WebSocket> webSocketTestSet = getUserSocketSet(userId);
webSocketTestSet.add(this);
}
}
@OnClose
public void onClose(Session session) {
// log.debug("close session id:{},isOpen:{}",userId+":"+session.getId(),session.isOpen());
if (StringUtils.isBlank(userId)) {
return;
}
CopyOnWriteArraySet<WebSocket> webSocketTestSet = userSocketMap.get(userId);
if (CollectionUtils.isEmpty(webSocketTestSet)) {
return;
}
webSocketTestSet.remove(this);
// log.debug("remove session id:{},deleteFlag:{}",userId+":"+session.getId(),webSocketTestSet.contains(this));
}
@OnMessage
public void onMessage(String message,Session session) {
if(StringUtils.isNotBlank(message) && "ping".equals(message)){
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("data", "pong");
WebsocketVo websocketVo = new WebsocketVo();
websocketVo.setData(JSON.toJSONString(jsonObject));
session.getBasicRemote().sendText(JSON.toJSONString(websocketVo));
} catch (IOException e) {
log.warn("【WebSocket推送】下发响应心跳包出错", e);
}
}
}
/**
* 用户报错
*/
@OnError
public void onError(Session session, Throwable error){
// TODO: 2022/7/19 主动断开
try {
session.close();
} catch (IOException e) {
log.debug("session close error,sessionId:{},error:{}",userId+":"+session.getId(),e);
}
}
/**
* @Description
* @author shishumin
* @date 2022/5/6 14:26
*/
public void sendMessage(String message) {
Set<String> userList = userSocketMap.keySet();
if (CollectionUtils.isEmpty(userList)) {
return;
}
for (String userId : userList) {
sendMessage(userId, message);
}
}
/**
* @Description: 发送消息
* @Author: shishumin
* @Date: 2022/5/16 5:45 PM
* @Parameters: [item, message]
* @Return void
*/
private void sendMessage(String userId, String message) {
CopyOnWriteArraySet<WebSocket> webSocket = userSocketMap.get(userId);
if (CollectionUtils.isEmpty(webSocket)) {
return;
}
for (WebSocket item : webSocket) {
if (item.getSession() == null || !item.getSession().isOpen()) {
continue;
}
/**
* 使用WebSocket做通知服务,但是批量推送或数据量大多线程并发下的时候会报错:java.lang.IllegalStateException。
* The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method、
* 解决方案: 增加synchronized, 改为同步发送
*/
synchronized (item.getSession()) {
if (item.getSession().isOpen()) {
try {
item.getSession().getBasicRemote().sendText(message);
} catch (Exception e) {
webSocket.remove(item);
}
}
}
}
}
/**
* @Description
* @author shishumin
* @date 2022/5/12 14:26
*/
public synchronized void sendMessageByUserId(List<String> userIds, String message) {
// 根据登录的用户发送:大屏告警
List<String> userList = userIds.stream().filter(userSocketMap.keySet()::contains).collect(Collectors.toList());
if(CollectionUtils.isEmpty(userList)){
return;
}
for (String userId : userList) {
sendMessage(userId, message);
}
}
public boolean exitUser(String userId) {
return userSocketMap.containsKey(userId);
}
public CopyOnWriteArraySet<WebSocket> getUserSocketSet(String userId) {
return userSocketMap.get(userId);
}
private void initUserInfo(String userId) {
CopyOnWriteArraySet<WebSocket> webSocketTestSet = new CopyOnWriteArraySet<>();
webSocketTestSet.add(this);
userSocketMap.put(userId, webSocketTestSet);
}
}