1.base 功能:
NettyBytebuf::
public class NettyByteBuf { public static void main(String[] args) { // 创建byteBuf对象,该对象内部包含一个字节数组byte[10] // 通过readerindex和writerIndex和capacity,将buffer分成三个区域 // 已经读取的区域:[0,readerindex) // 可读取的区域:[readerindex,writerIndex) // 可写的区域: [writerIndex,capacity) ByteBuf byteBuf = Unpooled.buffer(10); System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 8; i++) { byteBuf.writeByte(i); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5; i++) { System.out.println(byteBuf.getByte(i)); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5; i++) { System.out.println(byteBuf.readByte()); } System.out.println("byteBuf=" + byteBuf); //用Unpooled工具类创建ByteBuf ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhuge!", CharsetUtil.UTF_8); //使用相关的方法 if (byteBuf2.hasArray()) { byte[] content = byteBuf2.array(); //将 content 转成字符串 System.out.println(new String(content, CharsetUtil.UTF_8)); System.out.println("byteBuf=" + byteBuf2); System.out.println(byteBuf2.readerIndex()); // 0 System.out.println(byteBuf2.writerIndex()); // 12 System.out.println(byteBuf2.capacity()); // 36 System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104 int len = byteBuf2.readableBytes(); //可读的字节数 12 System.out.println("len=" + len); //使用for取出各个字节 for (int i = 0; i < len; i++) { System.out.println((char) byteBuf2.getByte(i)); } //范围读取 System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8)); System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8)); } } }
2.Nettyclient:
public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是ServerBootstrap而是Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //加入处理器 ch.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("netty client start。。"); //启动客户端去连接服务器端 ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); //对通道关闭进行监听 cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
3.客户端handeler:
package com.tuling.netty.base; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端连接服务器完成就会触发该方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } //当通道有读取事件时会触发,即服务端发送数据给客户端 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务端的地址: " + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
4.netty server信息:
public class NettyServer { public static void main(String[] args) throws Exception { //创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍 // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成 EventLoopGroup bossGroup = new NioEventLoopGroup(10); EventLoopGroup workerGroup = new NioEventLoopGroup(100000); try { //创建服务器端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来配置参数 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现 // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。 // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数 @Override protected void initChannel(SocketChannel ch) throws Exception { //对workerGroup的SocketChannel设置处理器 ch.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("netty server start。。"); //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况 //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕 ChannelFuture cf = bootstrap.bind(9000).sync(); //给cf注册监听器,监听我们关心的事件 /*cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } });*/ //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭 // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成 cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
5.
public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取客户端发送的数据 * * @param ctx 上下文对象, 含有通道channel,管道pipeline * @param msg 就是客户端发送的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName()); //Channel channel = ctx.channel(); //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站 //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); } /** * 数据读取完毕处理方法 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8)); ctx.writeAndFlush(buf); } /** * 处理异常, 一般是需要关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }