package netty1; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) throws Exception { //1 第一个线程组 是用于接收Client端连接的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2 第二个线程组 是用于实际的业务处理操作的 EventLoopGroup workerGroup = new NioEventLoopGroup(); //3 创建一个辅助类Bootstrap,就是对我们的Server进行一系列的配置 ServerBootstrap b = new ServerBootstrap(); //把俩个工作线程组加入进来 b.group(bossGroup, workerGroup) //我要指定使用NioServerSocketChannel这种类型的通道 .channel(NioServerSocketChannel.class) //一定要使用 childHandler 去绑定具体的 事件处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ServerHandler()); } }); //绑定指定的端口 进行监听 ChannelFuture f = b.bind(8765).sync(); f.channel().closeFuture().sync();//表示关闭前一直堵塞。相当于Thread.sleep(1000000); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
package netty1; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //do something msg ByteBuf buf = (ByteBuf)msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String request = new String(data, "utf-8"); System.out.println("Server: " + request); //写给客户端 String response = "我是反馈的信息"; ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes())); //.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package netty1; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { public static void main(String[] args) throws Exception { EventLoopGroup workgroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(workgroup) .channel(NioSocketChannel.class)//TCP客户端 用NioSocketChannel通道 .handler(new ChannelInitializer<SocketChannel>() {//handler有别与服务端 @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); //buf cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); cf1.channel().closeFuture().sync();//表示关闭前一直堵塞。 workgroup.shutdownGracefully(); } }
package netty1; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { //do something msg ByteBuf buf = (ByteBuf)msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String request = new String(data, "utf-8"); System.out.println("Client: " + request); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }