SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)

SpringBoot集成websocket(2)|(websocket服务端实现以及websocket转发实现)


章节
第一章链接: SpringBoot集成websocket(1)|(websocket客户端实现)
第二章链接: SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)

前言

本节主要介绍的是springboot实现websocket的客户端服务端,以及作为中转服务实现客户端长连接服务端,服务端长连接第三方websocket服务的数据传输。

一、websocket服务端依赖引入

   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>

二、websocket服务代码实现

1.WebSocketConfig配置

springboot接入websocket需要启用对应的配置

@Configuration
@EnableWebSocket
public class WebSocketConfig {
    
    
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
    
    
        return new ServerEndpointExporter();
    }
}

2.WebSocketServer服务实现

springboot提供对外的websocket接口实现


/**
 * @author Oak
 * Created on 2023/5/15.
 * Description:
 */
@Component
@Data
@Slf4j
@ServerEndpoint(value = "/ws/{name}")
public class WebSocketServer {
    
    
	// 调用第三方接口插件
    @Autowired
    private GptRestApi gptRestApi;

    @Autowired
    public MilvusLargeService largeService;

    @Autowired
    public DocumentParagraphRepository paragraphRepository;


    /**
     * 与某个客户端的连接对话,需要通过它来给客户端发送消息
     */
    private Session session;

    /**
     * 标识当前连接客户端的用户名
     */
    private String name;

    /**
     * 用于存所有的连接服务的客户端,这个对象存储是安全的
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketSet = new ConcurrentHashMap<>();

    /**
     * 用于存所有的连接第三方ws服务的客户端
     */
    private static ConcurrentHashMap<String, WebSocketClient> wsClientSet = new ConcurrentHashMap<>();


    @OnOpen
    public void OnOpen(Session session, @PathParam(value = "name") String name) {
    
    
        this.session = session;
        this.name = name;
        webSocketSet.put(name, this);
        WebSocketClient webSocketClient = getWebSocketClient(session);
        wsClientSet.put(name, webSocketClient);
        log.info("[web -> server] 连接成功,当前连接人数为:={}", webSocketSet.size());
    }


    @OnClose
    public void OnClose() {
    
    
        webSocketSet.remove(this.name);
        wsClientSet.remove(this.name);
        log.info("[web -> server] 退出成功,当前连接人数为:={}", webSocketSet.size());
    }

    @OnMessage
    public void OnMessage(String message) {
    
    
        log.info("[web -> server] 收到消息:{}", message);
        SparkWSClient sparkWSClient = (SparkWSClient) wsClientSet.get(name);
        // ping 消息响应 pong
        if ("ping".equals(message)) {
    
    
            try {
    
    
                session.getBasicRemote().sendText("pong");
                return;
            } catch (IOException e) {
    
    
                log.error("消息发送失败");
            }
        } else {
    
    
        // 业务消息
            if (sparkWSClient != null && sparkWSClient.isOpen()) {
    
    
                    sparkWSClient.send(message);
                } catch (Exception e) {
    
    
                    log.warn("异常:{}", e.getMessage());
                }
                return;
            } else {
    
     // 重新连接spark websocket
                WebSocketClient webSocketClient = getWebSocketClient(session);
                wsClientSet.put(name, webSocketClient);
            }
        }
    }

    /**
     * 群发
     *
     * @param message
     */
    public void GroupSending(String message) {
    
    
        for (String name : webSocketSet.keySet()) {
    
    
            try {
    
    
                webSocketSet.get(name).session.getBasicRemote().sendText(message);
            } catch (Exception e) {
    
    
                e.printStackTrace();
            }
        }
    }

// 第三方websocket接口,自己可以模拟写一个
    public WebSocketClient getWebSocketClient(Session session) {
    
    
        String chaturl = "ws://xxx/chat";
        UserSession build1 = UserSession.builder()
                .session(session)
                .name(name)
                .build();
        WebSocketClient client = new SparkWSClient(URI.create(chaturl ), build1);
        client.connect();
        return client;
    }

}

userSession代码如下

@Data
@Builder
public class UserSession {
    
    
    public  Session session;
    public String name;

    public UserSession(Session session,String name) {
    
    
        this.session = session;
        this.name = name;
    }

    public void sendUserMsg(String msg) {
    
    
        sendUserMsg(msg, false);
    }

    public void sendUserMsg(String msg, boolean close) {
    
    
        synchronized (session) {
    
    
            if (!session.isOpen()) {
    
    
                return;
            }
            try {
    
    
                session.getBasicRemote().sendText(msg);
                if (close) {
    
    
                    session.close();
                }
            } catch (IOException ex) {
    
    
                ex.printStackTrace();
            }
        }
    }
}

3.WebSocketClient连接第三方客户端实现

springboot提供对第三方websocket连接的客户端
实现代码如下

思路,两个websocket各自监听自己的数据,想要串起来可以通过消息中间件来实现,但这样无疑会增加系统的复杂性,还可以通过注入Session的方式,与客户端建立连接后,可以通过session进行数据推送,这个时候只要在 连接第三方的websocket的时候注入这个session兑现。那调用第三方的websocketClient就能通过session 将监听到的数据直接返回给客户端。

@Slf4j
public class SparkWSClient extends WebSocketClient {
    
    

    UserSession userSession;

    public SparkWSClient(URI serverUri, UserSession session) {
    
    
        super(serverUri);
        this.userSession = session;
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
    
    
        log.info("[server-> spark] 连接成功");
    }

    @Override
    public void onMessage(String message) {
    
    
        log.debug("[server-> spark] 收到消息={}...", message);
        //收到远程的第三方返回数据,这里通过注入的session直接返回给客户端
        userSession.sendUserMsg(message);
    }

    @Override
    public void onClose(int i, String s, boolean b) {
    
    
        log.debug("[server-> spark] 退出连接");
        String errMsg = String.format(MVS_ERR_MSG_FORMAT, userSession.getName(), "spark connection closed!");
        userSession.sendUserMsg(errMsg, true);
    }

    @Override
    public void onError(Exception e) {
    
    
        //转发给用户端
        String errMsg = String.format(MVS_ERR_MSG_FORMAT, userSession.getName(), "spark connection refused!");
        userSession.sendUserMsg(errMsg, true);
        log.error("[server-> spark] 连接错误={}", e.getMessage());
    }
}

总结

本文主要介绍websocket客户端、服务端的实现,以及客户端通过websocket连接服务端,服务端连接第三方的websocket接口,并且将第三个方数据直接返回给客户端,起一个中间websockey代理服务作用,实现数据的中转。

第一章链接: SpringBoot集成websocket(1)|(websocket客户端实现)
第二章链接: SpringBoot集成websocket(2)|(websocket服务端实现以及websocket中转实现)

猜你喜欢

转载自blog.csdn.net/Oaklkm/article/details/130724920