上一章,我了解了用netty 编写 一个简单的websocket 服务器,现在开始用 springboot 整合 netty:
将 WSServer作为一个组件,随着 springboot 的启动 完成,单独开一个线程,将 server 启动:
所以要将 WSServer 改成一个单例:
1. 创建一个 WSServer 类;
package com.imooc.netty; import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @Component //以前的没有 public class WSServer { private static class SingletionWSServer { static final WSServer instance = new WSServer(); } public static WSServer getInstance() { return SingletionWSServer.instance; } private EventLoopGroup mainGroup; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public WSServer() { mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WSServerInitialzer()); } public void start() { this.future = server.bind(8088); System.err.println("netty websocket server 启动完毕..."); } }
================================================================================================
2.创建一个 和 Application类功能类似的类 NettyBooter:(注意:要和Application类在同一目录下 )
package com.imooc; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; import com.imooc.netty.WSServer; @Component public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() == null) { try { WSServer.getInstance().start(); } catch (Exception e) { e.printStackTrace(); } } } }
================================================================================================
剩下的两个类和上一篇的一样:
package netty.websocket; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import java.time.LocalDateTime; /** * 处理消息 的 handler * TextWebSocketFrame : 在 netty 中,是用于为 websocket 专门处理文本的对象,frame 是消息的载体 * */ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 用于记录和管理 所有客户端的channle private static ChannelGroup clients=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //获取 客户端传输过来的消息 String content=msg.text(); System.out.println("接受到的数据:"+content); for (Channel channel :clients){ channel.writeAndFlush( new TextWebSocketFrame("【服务器在】:"+ LocalDateTime.now() +"接受到消息,消息为"+content)); } } /** * 当客户端连接服务端之后(打开链接) * 获取客户端的 Channel,并且放到 ChannelGroup 中去进行管理 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //当触发 handlerRemove,channelGroup 会自动移除对应的客户端的channel System.out.println("客户端断开,channel对应的长id为:"+ctx.channel().id().asLongText()); } }
=======================================================================================
package netty.websocket; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class WSServerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline =channel.pipeline(); //websocket 基于 http 协议,所以要有 http 编解码器 pipeline.addLast(new HttpServerCodec()); //对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage 进行聚合,聚合成 FullHttpRequest 或 FullHttpResonse pipeline.addLast(new HttpObjectAggregator(1024*64)); // ================ 以上是 用于支持 http协议=============== // websocket 服务器处理的协议,用于指定 给 客户端连接访问的路由:/ws (/ws 是可以自定义的) // 本 handler 会帮助你处理 一些 繁琐的复杂的事 // 会帮助 你处理一些 握手动作:handingshaking(close,ping,pong) ping+pong=心跳 // 对于 websocket 来讲 ,都是以 frames 进行 传输的,不同的数据类型 对应的frames 也不同 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // ================ 以下是自定义的 handler=============== //自定义的 handler pipeline.addLast("ChatHandler", new ChatHandler()); } }