网络编程8

编解码器框架

什么是编解码器

  • 数据在网络中传输,两端的操作系统类型是不一样的,对数据的解析方式也是不一样的,所以网络上传输的都是01串,都是字节
  • 通讯两端就需要转换成01串,以及从01串转成真实数据

解码器

  • 将字节解码为消息

  • 将一种消息类型解码为另一种消息格式

    • 比如https,需要先转换成密文,再解密成明文
  • ByteToMessageDecoder

    • decode是我们要实现的一个抽象方法

    • protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
      
    • in是消息,out是下一个handler处理的数据

  • MessageToMessageDecoder

    • 也是decode方法最重要

    • protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
      
    • msg是上一个handler传过来的数据,out是传给下一个handler的数据

  • TooLongFrameException

    • 因为解码需要将全部的数据进行解码,没办法分开解码,所以先收到的数据需要先保存起来,如果程序没有设计好,传送的数据过多,有可能撑爆

    • package cn.enjoyedu.nettybasic;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.handler.codec.ByteToMessageDecoder;
      import io.netty.handler.codec.TooLongFrameException;
      
      import java.util.List;
      
      public class TooLongExSample  extends ByteToMessageDecoder {
              
              
      
          private static final int MAX_SIZE = 1024;
      
          @Override
          protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                  throws Exception {
              
              
              // 当前可以读的字节数有多少
              int readable = in.readableBytes();
              // 如果读取的字节数超过我们定义的阈值,则关闭通讯并抛出异常
              if(readable>MAX_SIZE){
              
              
                  ctx.close();
                  throw new TooLongFrameException("传入的数据太多");
              }
      
          }
      }
      
      

编码器

  • 将消息编码为字节

  • MessageToByteEncoder

  • 将消息编码为消息

  • MessageToMessageEncoder

编解码器类

  • 编码和解码是同一种算法,对称加密
  • ByteToMessageCodec
    • 既有编码又有解码
  • 但是编码时尽量分开,代码单一职责原则

Netty内置的编解码器和ChannelHandler

写一个web服务器demo----通过SSL/TLS 保护Netty 应用程序和HTTP系列

  • 实现服务器

  • 浏览器访问

  • 用Netty实现客户端

  • 增加压缩支持

  • Https支持

    • package cn.enjoyedu.nettyhttp.server;
      
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.handler.ssl.SslContext;
      import io.netty.handler.ssl.SslContextBuilder;
      import io.netty.handler.ssl.util.SelfSignedCertificate;
      
      /**
       * @author Mark老师   
       * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
       * 类说明:
       */
      public class HttpServer {
              
              
          public static final int port = 6789; //设置服务端端口
          private static EventLoopGroup group = new NioEventLoopGroup();   // 通过nio方式来接收连接和处理连接
          private static ServerBootstrap b = new ServerBootstrap();
          private static final boolean SSL = false;/*是否开启SSL模式*/
      
          /**
           * Netty创建全部都是实现自AbstractBootstrap。
           * 客户端的是Bootstrap,服务端的则是ServerBootstrap。
           **/
          public static void main(String[] args) throws Exception {
              
              
              final SslContext sslCtx;
              if(SSL){
              
              
                  // 签名认证实例
                  SelfSignedCertificate ssc = new SelfSignedCertificate();
                  // 构建ssl上下文
                  sslCtx = SslContextBuilder.forServer(ssc.certificate(),
                          ssc.privateKey()).build();
      
              }else{
              
              
                  sslCtx = null;
              }
              try {
              
              
                  b.group(group);
                  b.channel(NioServerSocketChannel.class);
                  // 把ssl上下文传给handler
                  b.childHandler(new ServerHandlerInit(sslCtx)); //设置过滤器
                  // 服务器绑定端口监听
                  ChannelFuture f = b.bind(port).sync();
                  System.out.println("服务端启动成功,端口是:"+port);
                  // 监听服务器关闭监听
                  f.channel().closeFuture().sync();
              } finally {
              
              
                  group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程
              }
          }
      }
      
      
    • private static final boolean SSL = false; 判断是否开启ssl模式

    • 当开启ssl模式后,访问http://127.0.0.1:6789/test

    • 此时会显示该页面无法正常运作------代码抛了异常,因为传输的是明文,不是密文

    • 改成https://127.0.0.1:6789/test

    • 提示您的连接不是私密连接------这个证书需要去第三方服务器认证

服务器端

HttpServer

package cn.enjoyedu.nettyhttp.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * @author Mark老师   
 * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
 * 类说明:
 */
public class HttpServer {
    
    
    public static final int port = 6789; //设置服务端端口
    private static EventLoopGroup group = new NioEventLoopGroup();   // 通过nio方式来接收连接和处理连接
    private static ServerBootstrap b = new ServerBootstrap();
    private static final boolean SSL = false;/*是否开启SSL模式*/

    /**
     * Netty创建全部都是实现自AbstractBootstrap。
     * 客户端的是Bootstrap,服务端的则是ServerBootstrap。
     **/
    public static void main(String[] args) throws Exception {
    
    
        final SslContext sslCtx;
        if(SSL){
    
    
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(),
                    ssc.privateKey()).build();

        }else{
    
    
            sslCtx = null;
        }
        try {
    
    
            b.group(group);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ServerHandlerInit(sslCtx)); //设置过滤器
            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务端启动成功,端口是:"+port);
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
        } finally {
    
    
            group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程
        }
    }
}

ServerHandlerInit

package cn.enjoyedu.nettyhttp.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;

/**
 * @author Mark老师   
 * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
 * 类说明:
 */
public class ServerHandlerInit extends ChannelInitializer<SocketChannel> {
    
    

    private final SslContext sslCtx;

    public ServerHandlerInit(SslContext sslCtx) {
    
    
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    
    
        ChannelPipeline ph = ch.pipeline();
        // 如果ssl上下文不为null,开启ssl认证的handler
        if(sslCtx!=null){
    
    
            ph.addLast(sslCtx.newHandler(ch.alloc()));
        }
        /*把应答报文 编码*/
        // http响应编码器
        ph.addLast("encoder",new HttpResponseEncoder());
        /*把请求报文 解码*/
        // http请求解码器
        ph.addLast("decoder",new HttpRequestDecoder());


        /*聚合http为一个完整的报文*/
        // 因为http的消息体可能是分块发送的,chunk
        // 因此需要聚合成一个完整的报文,最大10M
        ph.addLast("aggregator",
                new HttpObjectAggregator(10*1024*1024));
        /*把应答报文 压缩,非必要*/
        // http的报文压缩,减轻带宽压力
        ph.addLast("compressor",new HttpContentCompressor());
        // 自己的业务handler
        // 一定是一个入站handler
        ph.addLast(new BusiHandler());


    }
}

自己的业务handler–BusiHandler

  • package cn.enjoyedu.nettyhttp.server;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.http.*;
    import io.netty.util.CharsetUtil;
    
    /**
     * @author Mark老师   
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class BusiHandler extends ChannelInboundHandlerAdapter {
          
          
    
        /**
         * 发送的返回值
         * @param ctx     返回
         * @param context 消息
         * @param status 状态
         */
        private void send(ChannelHandlerContext ctx, String context,
                          HttpResponseStatus status) {
          
          
            // http应答报文
            // HttpVersion.HTTP_1_1   http1.1
            // status   http状态码
            FullHttpResponse response = new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1,status,
                    Unpooled.copiedBuffer(context,CharsetUtil.UTF_8)
            );
           
     		// 消息头里面的其它定义内容
            // 传输的文本类型----HttpHeaderNames.CONTENT_TYPE
            response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8");
            // http1.1里面不开启keep-alive是需要服务器关闭连接的
            // 加一个监听器ChannelFutureListener.CLOSE
            // 写完对端的动态执行完了,就执行监视器里面的关闭动作
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          
          
            String result="";
            // 把消息体强制转型成http请求类
            FullHttpRequest httpRequest = (FullHttpRequest)msg;
            System.out.println(httpRequest.headers());
            try{
          
          
                //获取路径
                String path=httpRequest.uri();
                //获取body
                String body = httpRequest.content().toString(CharsetUtil.UTF_8);
                //获取请求方法
                HttpMethod method=httpRequest.method();
                System.out.println("接收到:"+method+" 请求");
                //如果不是这个路径,就直接返回错误
                if(!"/test".equalsIgnoreCase(path)){
          
          
                    result="非法请求!"+path;
                    // 发送错误报文
                    send(ctx,result,HttpResponseStatus.BAD_REQUEST);
                    return;
                }
    
                // 只实现GET方法
                // 如果是GET请求
                if(HttpMethod.GET.equals(method)){
          
          
                    //接受到的消息,做业务逻辑处理...
                    System.out.println("body:"+body);
                    result="GET请求,应答:"+RespConstant.getNews();
                    send(ctx,result,HttpResponseStatus.OK);
                    return;
                }
                //如果是其他类型请求,如post
                if(HttpMethod.POST.equals(method)){
          
          
                    //接受到的消息,做业务逻辑处理...
                    //....
                    return;
                }
    
            }catch(Exception e){
          
          
                System.out.println("处理请求失败!");
                e.printStackTrace();
            }finally{
          
          
                //释放请求
                httpRequest.release();
            }
        }
    
        /*
         * 建立连接时,返回消息
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
          
          
            System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
        }
    }
    
    

消息体常量内容----RespConstant

  • package cn.enjoyedu.nettyhttp.server;
    
    import java.util.Random;
    
    /**
     * @author Mark老师   
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class RespConstant {
          
          
        private static final String[] NEWS = {
          
          
                "她那时候还太年轻,不知道所有命运赠送的礼物,早已在暗中标好了价格。——斯蒂芬·茨威格《断头皇后》",
                "这是一个最好的时代,也是一个最坏的时代;这是一个智慧的年代,这是一个愚蠢的年代;\n" +
                        "这是一个信任的时期,这是一个怀疑的时期;这是一个光明的季节,这是一个黑暗的季节;\n" +
                        "这是希望之春,这是失望之冬;人们面前应有尽有,人们面前一无所有;\n" +
                        "人们正踏上天堂之路,人们正走向地狱之门。 —— 狄更斯《双城记》",
                };
    
        private static final Random R = new Random();
    
        public static String getNews(){
          
          
            return NEWS[R.nextInt(NEWS.length)];
        }
    
    }
    
    

客户端

HttpClient

  • package cn.enjoyedu.nettyhttp.client;
    
    import cn.enjoyedu.nettyhttp.server.HttpServer;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.http.*;
    
    import java.net.URI;
    
    /**
     * @author Mark老师  
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class HttpClient {
          
          
        public static final String HOST  = "127.0.0.1";
        private static final boolean SSL = false;
        public void connect(String host, int port) throws Exception {
          
          
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
          
          
                Bootstrap b = new Bootstrap();
                b.group(workerGroup);
                b.channel(NioSocketChannel.class);
                b.option(ChannelOption.SO_KEEPALIVE, true);
                b.handler(new ChannelInitializer<SocketChannel>() {
          
          
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
          
          
                        // 编解码器,只用写一个就行了,http请求编码,http响应解码
                        ch.pipeline().addLast(new HttpClientCodec());
                        /*聚合http为一个完整的报文*/
                        // http报文聚合
                        ch.pipeline().addLast("aggregator",
                                new HttpObjectAggregator(10*1024*1024));
                        /*解压缩*/
                        // 报文解压缩
                        ch.pipeline().addLast("decompressor",
                                new HttpContentDecompressor());
                        // 自己的业务handler,发送http请求
                        ch.pipeline().addLast(new HttpClientInboundHandler());
                    }
                });
    
                // Start the client.
                ChannelFuture f = b.connect(host, port).sync();
                f.channel().closeFuture().sync();
            } finally {
          
          
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
          
          
            HttpClient client = new HttpClient();
            client.connect("127.0.0.1", HttpServer.port);
        }
    }
    
    
  • HttpClientCodec是一个编解码器,只用写一个就行了,http请求编码,http响应解码

HttpClientInboundHandler

  • package cn.enjoyedu.nettyhttp.client;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.http.*;
    import io.netty.util.CharsetUtil;
    
    import java.net.URI;
    
    /**
     * @author Mark老师   
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
          
          
    
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          
          
            FullHttpResponse httpResponse = (FullHttpResponse)msg;
            System.out.println(httpResponse.status());
            System.out.println(httpResponse.headers());
            ByteBuf buf = httpResponse.content();
            System.out.println(buf.toString(CharsetUtil.UTF_8));
            // 自行处理了报文,必须释放buffer,不用后续handler处理了
            // 为了避免内存泄漏,所以要释放buffer
            httpResponse.release();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
          
          
            URI uri = new URI("/test");
            String msg = "Hello";
            DefaultFullHttpRequest request =
                    new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
                            HttpMethod.GET,
                            uri.toASCIIString(),
                            Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
    
            // 构建http请求
            request.headers().set(HttpHeaderNames.HOST, HttpClient.HOST);
            request.headers().set(HttpHeaderNames.CONNECTION,
                    HttpHeaderValues.KEEP_ALIVE);
            request.headers().set(HttpHeaderNames.CONTENT_LENGTH,
                    request.content().readableBytes());
            // 发送http请求
            ctx.writeAndFlush(request);
        }
    }
    
    
  • 为什么是入站?

  • 应为业务主要的处理内容是解析服务端传回的应答报文

netty处理空闲的连接和超时的handler

  • tcp协议自己的keep-alive,时间太长了,一般是120分钟,所以往往应用层是自己建立心跳机制
  • IdleStateHandler----空闲状态判断handler
    • 读空闲handler — ReadTimeoutHandler
    • 写空闲handler — WriteTimeoutHandler

序列化问题

为什么需要序列化

  • 序列化的目的

    • 将javaBean转化成二进制数据
  • Java序列化的缺点和业界主流的序列化框架:各种 Java 的序列化库的性能比较测试结果

    • 为什么不用java的序列化?

      1.只能在java中用

      2.速度慢,字节长度长

    • 测试

    • /*
       * Copyright 2013-2018 Lilinfeng.
       *  
       * Licensed under the Apache License, Version 2.0 (the "License");
       * you may not use this file except in compliance with the License.
       * You may obtain a copy of the License at
       *  
       *      http://www.apache.org/licenses/LICENSE-2.0
       *  
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      package cn.enjoyedu.nettybasic.serializable.protogenesis;
      
      import java.io.ByteArrayOutputStream;
      import java.io.IOException;
      import java.io.ObjectOutputStream;
      import java.nio.ByteBuffer;
      
      /**
       * @author Mark老师 
       * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
       * 类说明:测试序列化性能差异
       */
      public class PerformTestUserInfo {
              
              
      
          public static void main(String[] args) throws IOException {
              
              
      	UserInfo info = new UserInfo();
      	info.buildUserID(100).buildUserName("Welcome to Netty");
      	int loop = 1000000;
      	ByteArrayOutputStream bos = null;
      	ObjectOutputStream os = null;
      	long startTime = System.currentTimeMillis();
      	for (int i = 0; i < loop; i++) {
              
              
      	    bos = new ByteArrayOutputStream();
      	    os = new ObjectOutputStream(bos);
      	    os.writeObject(info);
      	    os.flush();
      	    os.close();
      	    byte[] b = bos.toByteArray();
      	    bos.close();
      	}
      	long endTime = System.currentTimeMillis();
      	System.out.println("The jdk serializable cost time is  : "
      		+ (endTime - startTime) + " ms");
      
      	System.out.println("-------------------------------------");
      
      	ByteBuffer buffer = ByteBuffer.allocate(1024);
      	startTime = System.currentTimeMillis();
      	for (int i = 0; i < loop; i++) {
              
              
      	    byte[] b = info.codeC(buffer);
      	}
      	endTime = System.currentTimeMillis();
      	System.out.println("The byte array serializable cost time is : "
      		+ (endTime - startTime) + " ms");
      
          }
      
      }
      
      
  • 很少用jdk序列化,java生态圈提供了很多序列化框架

    • https://yq.aliyun.com/articles/200607

    • 例如dubbo中的hessin

    • 一些序列化框架

      java-built-in hessian kryo fast-serialization jboss-serialization jboss-marshalling-river protostuff msgpack-databind json/jackson/databind json/jackson/db-afterburner json/protostuff-runtime json/google-gson/databind json/svenson-databind json/flexjson/databind json/fastjson/databind smile/jackson/databind smile/jackson/db-afterburner bson/jackson/databind xml/xstream+c xml/jackson/databind-aalto

  • netty自己的序列化

    • 内置有哪些?

      1.Jboss-----Marshalling

      2.google-----Protocol Buffers

      下载谷歌序列化工具protoc.exe

      写完实体类后,再写一个相关的描述文件,然后通过上面那个工具就可以生成一个类

      // Generated by the protocol buffer compiler.  DO NOT EDIT!
      // source: Person.proto
      
      package cn.enjoyedu.nettybasic.serializable.protobuf;
      
      public final class PersonProto {
              
              
        private PersonProto() {
              
              }
        public static void registerAllExtensions(
            com.google.protobuf.ExtensionRegistry registry) {
              
              
        }
        public interface PersonOrBuilder extends
            // @@protoc_insertion_point(interface_extends:netty.Person)
            com.google.protobuf.MessageOrBuilder {
              
              
      
          /**
           * <code>required int32 id = 1;</code>
           */
          boolean hasId();
          /**
           * <code>required int32 id = 1;</code>
           */
          int getId();
      
          /**
           * <code>required string name = 2;</code>
           */
          boolean hasName();
          /**
           * <code>required string name = 2;</code>
           */
          String getName();
          /**
           * <code>required string name = 2;</code>
           */
          com.google.protobuf.ByteString
              getNameBytes();
      
          /**
           * <code>optional string email = 3;</code>
           */
          boolean hasEmail();
          /**
           * <code>optional string email = 3;</code>
           */
          String getEmail();
          /**
           * <code>optional string email = 3;</code>
           */
          com.google.protobuf.ByteString
              getEmailBytes();
        }
        /**
         * Protobuf type {@code netty.Person}
         */
        public static final class Person extends
            com.google.protobuf.GeneratedMessage implements
            // @@protoc_insertion_point(message_implements:netty.Person)
            PersonOrBuilder {
              
              
          // Use Person.newBuilder() to construct.
          private Person(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
              
              
            super(builder);
            this.unknownFields = builder.getUnknownFields();
          }
          private Person(boolean noInit) {
              
               this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
      
          private static final Person defaultInstance;
          public static Person getDefaultInstance() {
              
              
            return defaultInstance;
          }
      
          public Person getDefaultInstanceForType() {
              
              
            return defaultInstance;
          }
      
          private final com.google.protobuf.UnknownFieldSet unknownFields;
          @Override
          public final com.google.protobuf.UnknownFieldSet
              getUnknownFields() {
              
              
            return this.unknownFields;
          }
          private Person(
              com.google.protobuf.CodedInputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws com.google.protobuf.InvalidProtocolBufferException {
              
              
            initFields();
            int mutable_bitField0_ = 0;
            com.google.protobuf.UnknownFieldSet.Builder unknownFields =
                com.google.protobuf.UnknownFieldSet.newBuilder();
            try {
              
              
              boolean done = false;
              while (!done) {
              
              
                int tag = input.readTag();
                switch (tag) {
              
              
                  case 0:
                    done = true;
                    break;
                  default: {
              
              
                    if (!parseUnknownField(input, unknownFields,
                                           extensionRegistry, tag)) {
              
              
                      done = true;
                    }
                    break;
                  }
                  case 8: {
              
              
                    bitField0_ |= 0x00000001;
                    id_ = input.readInt32();
                    break;
                  }
                  case 18: {
              
              
                    com.google.protobuf.ByteString bs = input.readBytes();
                    bitField0_ |= 0x00000002;
                    name_ = bs;
                    break;
                  }
                  case 26: {
              
              
                    com.google.protobuf.ByteString bs = input.readBytes();
                    bitField0_ |= 0x00000004;
                    email_ = bs;
                    break;
                  }
                }
              }
            } catch (com.google.protobuf.InvalidProtocolBufferException e) {
              
              
              throw e.setUnfinishedMessage(this);
            } catch (java.io.IOException e) {
              
              
              throw new com.google.protobuf.InvalidProtocolBufferException(
                  e.getMessage()).setUnfinishedMessage(this);
            } finally {
              
              
              this.unknownFields = unknownFields.build();
              makeExtensionsImmutable();
            }
          }
          public static final com.google.protobuf.Descriptors.Descriptor
              getDescriptor() {
              
              
            return PersonProto.internal_static_netty_Person_descriptor;
          }
      
          protected FieldAccessorTable
              internalGetFieldAccessorTable() {
              
              
            return PersonProto.internal_static_netty_Person_fieldAccessorTable
                .ensureFieldAccessorsInitialized(
                    Person.class, Builder.class);
          }
      
          public static com.google.protobuf.Parser<Person> PARSER =
              new com.google.protobuf.AbstractParser<Person>() {
              
              
            public Person parsePartialFrom(
                com.google.protobuf.CodedInputStream input,
                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
                throws com.google.protobuf.InvalidProtocolBufferException {
              
              
              return new Person(input, extensionRegistry);
            }
          };
      
          @Override
          public com.google.protobuf.Parser<Person> getParserForType() {
              
              
            return PARSER;
          }
      
          private int bitField0_;
          public static final int ID_FIELD_NUMBER = 1;
          private int id_;
          /**
           * <code>required int32 id = 1;</code>
           */
          public boolean hasId() {
              
              
            return ((bitField0_ & 0x00000001) == 0x00000001);
          }
          /**
           * <code>required int32 id = 1;</code>
           */
          public int getId() {
              
              
            return id_;
          }
      
          public static final int NAME_FIELD_NUMBER = 2;
          private Object name_;
          /**
           * <code>required string name = 2;</code>
           */
          public boolean hasName() {
              
              
            return ((bitField0_ & 0x00000002) == 0x00000002);
          }
          /**
           * <code>required string name = 2;</code>
           */
          public String getName() {
              
              
            Object ref = name_;
            if (ref instanceof String) {
              
              
              return (String) ref;
            } else {
              
              
              com.google.protobuf.ByteString bs = 
                  (com.google.protobuf.ByteString) ref;
              String s = bs.toStringUtf8();
              if (bs.isValidUtf8()) {
              
              
                name_ = s;
              }
              return s;
            }
          }
          /**
           * <code>required string name = 2;</code>
           */
          public com.google.protobuf.ByteString
              getNameBytes() {
              
              
            Object ref = name_;
            if (ref instanceof String) {
              
              
              com.google.protobuf.ByteString b = 
                  com.google.protobuf.ByteString.copyFromUtf8(
                      (String) ref);
              name_ = b;
              return b;
            } else {
              
              
              return (com.google.protobuf.ByteString) ref;
            }
          }
      
          public static final int EMAIL_FIELD_NUMBER = 3;
          private Object email_;
          /**
           * <code>optional string email = 3;</code>
           */
          public boolean hasEmail() {
              
              
            return ((bitField0_ & 0x00000004) == 0x00000004);
          }
          /**
           * <code>optional string email = 3;</code>
           */
          public String getEmail() {
              
              
            Object ref = email_;
            if (ref instanceof String) {
              
              
              return (String) ref;
            } else {
              
              
              com.google.protobuf.ByteString bs = 
                  (com.google.protobuf.ByteString) ref;
              String s = bs.toStringUtf8();
              if (bs.isValidUtf8()) {
              
              
                email_ = s;
              }
              return s;
            }
          }
          /**
           * <code>optional string email = 3;</code>
           */
          public com.google.protobuf.ByteString
              getEmailBytes() {
              
              
            Object ref = email_;
            if (ref instanceof String) {
              
              
              com.google.protobuf.ByteString b = 
                  com.google.protobuf.ByteString.copyFromUtf8(
                      (String) ref);
              email_ = b;
              return b;
            } else {
              
              
              return (com.google.protobuf.ByteString) ref;
            }
          }
      
          private void initFields() {
              
              
            id_ = 0;
            name_ = "";
            email_ = "";
          }
          private byte memoizedIsInitialized = -1;
          public final boolean isInitialized() {
              
              
            byte isInitialized = memoizedIsInitialized;
            if (isInitialized == 1) return true;
            if (isInitialized == 0) return false;
      
            if (!hasId()) {
              
              
              memoizedIsInitialized = 0;
              return false;
            }
            if (!hasName()) {
              
              
              memoizedIsInitialized = 0;
              return false;
            }
            memoizedIsInitialized = 1;
            return true;
          }
      
          public void writeTo(com.google.protobuf.CodedOutputStream output)
                              throws java.io.IOException {
              
              
            getSerializedSize();
            if (((bitField0_ & 0x00000001) == 0x00000001)) {
              
              
              output.writeInt32(1, id_);
            }
            if (((bitField0_ & 0x00000002) == 0x00000002)) {
              
              
              output.writeBytes(2, getNameBytes());
            }
            if (((bitField0_ & 0x00000004) == 0x00000004)) {
              
              
              output.writeBytes(3, getEmailBytes());
            }
            getUnknownFields().writeTo(output);
          }
      
          private int memoizedSerializedSize = -1;
          public int getSerializedSize() {
              
              
            int size = memoizedSerializedSize;
            if (size != -1) return size;
      
            size = 0;
            if (((bitField0_ & 0x00000001) == 0x00000001)) {
              
              
              size += com.google.protobuf.CodedOutputStream
                .computeInt32Size(1, id_);
            }
            if (((bitField0_ & 0x00000002) == 0x00000002)) {
              
              
              size += com.google.protobuf.CodedOutputStream
                .computeBytesSize(2, getNameBytes());
            }
            if (((bitField0_ & 0x00000004) == 0x00000004)) {
              
              
              size += com.google.protobuf.CodedOutputStream
                .computeBytesSize(3, getEmailBytes());
            }
            size += getUnknownFields().getSerializedSize();
            memoizedSerializedSize = size;
            return size;
          }
      
          private static final long serialVersionUID = 0L;
          @Override
          protected Object writeReplace()
              throws java.io.ObjectStreamException {
              
              
            return super.writeReplace();
          }
      
          public static Person parseFrom(
              com.google.protobuf.ByteString data)
              throws com.google.protobuf.InvalidProtocolBufferException {
              
              
            return PARSER.parseFrom(data);
          }
          public static Person parseFrom(
              com.google.protobuf.ByteString data,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws com.google.protobuf.InvalidProtocolBufferException {
              
              
            return PARSER.parseFrom(data, extensionRegistry);
          }
          public static Person parseFrom(byte[] data)
              throws com.google.protobuf.InvalidProtocolBufferException {
              
              
            return PARSER.parseFrom(data);
          }
          public static Person parseFrom(
              byte[] data,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws com.google.protobuf.InvalidProtocolBufferException {
              
              
            return PARSER.parseFrom(data, extensionRegistry);
          }
          public static Person parseFrom(java.io.InputStream input)
              throws java.io.IOException {
              
              
            return PARSER.parseFrom(input);
          }
          public static Person parseFrom(
              java.io.InputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws java.io.IOException {
              
              
            return PARSER.parseFrom(input, extensionRegistry);
          }
          public static Person parseDelimitedFrom(java.io.InputStream input)
              throws java.io.IOException {
              
              
            return PARSER.parseDelimitedFrom(input);
          }
          public static Person parseDelimitedFrom(
              java.io.InputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws java.io.IOException {
              
              
            return PARSER.parseDelimitedFrom(input, extensionRegistry);
          }
          public static Person parseFrom(
              com.google.protobuf.CodedInputStream input)
              throws java.io.IOException {
              
              
            return PARSER.parseFrom(input);
          }
          public static Person parseFrom(
              com.google.protobuf.CodedInputStream input,
              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
              throws java.io.IOException {
              
              
            return PARSER.parseFrom(input, extensionRegistry);
          }
      
          public static Builder newBuilder() {
              
               return Builder.create(); }
          public Builder newBuilderForType() {
              
               return newBuilder(); }
          public static Builder newBuilder(Person prototype) {
              
              
            return newBuilder().mergeFrom(prototype);
          }
          public Builder toBuilder() {
              
               return newBuilder(this); }
      
          @Override
          protected Builder newBuilderForType(
              BuilderParent parent) {
              
              
            Builder builder = new Builder(parent);
            return builder;
          }
          /**
           * Protobuf type {@code netty.Person}
           */
          public static final class Builder extends
              com.google.protobuf.GeneratedMessage.Builder<Builder> implements
              // @@protoc_insertion_point(builder_implements:netty.Person)
              PersonOrBuilder {
              
              
            public static final com.google.protobuf.Descriptors.Descriptor
                getDescriptor() {
              
              
              return PersonProto.internal_static_netty_Person_descriptor;
            }
      
            protected FieldAccessorTable
                internalGetFieldAccessorTable() {
              
              
              return PersonProto.internal_static_netty_Person_fieldAccessorTable
                  .ensureFieldAccessorsInitialized(
                      Person.class, Builder.class);
            }
      
            // Construct using PersonProto.Person.newBuilder()
            private Builder() {
              
              
              maybeForceBuilderInitialization();
            }
      
            private Builder(
                BuilderParent parent) {
              
              
              super(parent);
              maybeForceBuilderInitialization();
            }
            private void maybeForceBuilderInitialization() {
              
              
              if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
              
              
              }
            }
            private static Builder create() {
              
              
              return new Builder();
            }
      
            public Builder clear() {
              
              
              super.clear();
              id_ = 0;
              bitField0_ = (bitField0_ & ~0x00000001);
              name_ = "";
              bitField0_ = (bitField0_ & ~0x00000002);
              email_ = "";
              bitField0_ = (bitField0_ & ~0x00000004);
              return this;
            }
      
            public Builder clone() {
              
              
              return create().mergeFrom(buildPartial());
            }
      
            public com.google.protobuf.Descriptors.Descriptor
                getDescriptorForType() {
              
              
              return PersonProto.internal_static_netty_Person_descriptor;
            }
      
            public Person getDefaultInstanceForType() {
              
              
              return Person.getDefaultInstance();
            }
      
            public Person build() {
              
              
              Person result = buildPartial();
              if (!result.isInitialized()) {
              
              
                throw newUninitializedMessageException(result);
              }
              return result;
            }
      
            public Person buildPartial() {
              
              
              Person result = new Person(this);
              int from_bitField0_ = bitField0_;
              int to_bitField0_ = 0;
              if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
              
              
                to_bitField0_ |= 0x00000001;
              }
              result.id_ = id_;
              if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
              
              
                to_bitField0_ |= 0x00000002;
              }
              result.name_ = name_;
              if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
              
              
                to_bitField0_ |= 0x00000004;
              }
              result.email_ = email_;
              result.bitField0_ = to_bitField0_;
              onBuilt();
              return result;
            }
      
            public Builder mergeFrom(com.google.protobuf.Message other) {
              
              
              if (other instanceof Person) {
              
              
                return mergeFrom((Person)other);
              } else {
              
              
                super.mergeFrom(other);
                return this;
              }
            }
      
            public Builder mergeFrom(Person other) {
              
              
              if (other == Person.getDefaultInstance()) return this;
              if (other.hasId()) {
              
              
                setId(other.getId());
              }
              if (other.hasName()) {
              
              
                bitField0_ |= 0x00000002;
                name_ = other.name_;
                onChanged();
              }
              if (other.hasEmail()) {
              
              
                bitField0_ |= 0x00000004;
                email_ = other.email_;
                onChanged();
              }
              this.mergeUnknownFields(other.getUnknownFields());
              return this;
            }
      
            public final boolean isInitialized() {
              
              
              if (!hasId()) {
              
              
                
                return false;
              }
              if (!hasName()) {
              
              
                
                return false;
              }
              return true;
            }
      
            public Builder mergeFrom(
                com.google.protobuf.CodedInputStream input,
                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
                throws java.io.IOException {
              
              
              Person parsedMessage = null;
              try {
              
              
                parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
              } catch (com.google.protobuf.InvalidProtocolBufferException e) {
              
              
                parsedMessage = (Person) e.getUnfinishedMessage();
                throw e;
              } finally {
              
              
                if (parsedMessage != null) {
              
              
                  mergeFrom(parsedMessage);
                }
              }
              return this;
            }
            private int bitField0_;
      
            private int id_ ;
            /**
             * <code>required int32 id = 1;</code>
             */
            public boolean hasId() {
              
              
              return ((bitField0_ & 0x00000001) == 0x00000001);
            }
            /**
             * <code>required int32 id = 1;</code>
             */
            public int getId() {
              
              
              return id_;
            }
            /**
             * <code>required int32 id = 1;</code>
             */
            public Builder setId(int value) {
              
              
              bitField0_ |= 0x00000001;
              id_ = value;
              onChanged();
              return this;
            }
            /**
             * <code>required int32 id = 1;</code>
             */
            public Builder clearId() {
              
              
              bitField0_ = (bitField0_ & ~0x00000001);
              id_ = 0;
              onChanged();
              return this;
            }
      
            private Object name_ = "";
            /**
             * <code>required string name = 2;</code>
             */
            public boolean hasName() {
              
              
              return ((bitField0_ & 0x00000002) == 0x00000002);
            }
            /**
             * <code>required string name = 2;</code>
             */
            public String getName() {
              
              
              Object ref = name_;
              if (!(ref instanceof String)) {
              
              
                com.google.protobuf.ByteString bs =
                    (com.google.protobuf.ByteString) ref;
                String s = bs.toStringUtf8();
                if (bs.isValidUtf8()) {
              
              
                  name_ = s;
                }
                return s;
              } else {
              
              
                return (String) ref;
              }
            }
            /**
             * <code>required string name = 2;</code>
             */
            public com.google.protobuf.ByteString
                getNameBytes() {
              
              
              Object ref = name_;
              if (ref instanceof String) {
              
              
                com.google.protobuf.ByteString b = 
                    com.google.protobuf.ByteString.copyFromUtf8(
                        (String) ref);
                name_ = b;
                return b;
              } else {
              
              
                return (com.google.protobuf.ByteString) ref;
              }
            }
            /**
             * <code>required string name = 2;</code>
             */
            public Builder setName(
                String value) {
              
              
              if (value == null) {
              
              
          throw new NullPointerException();
        }
        bitField0_ |= 0x00000002;
              name_ = value;
              onChanged();
              return this;
            }
            /**
             * <code>required string name = 2;</code>
             */
            public Builder clearName() {
              
              
              bitField0_ = (bitField0_ & ~0x00000002);
              name_ = getDefaultInstance().getName();
              onChanged();
              return this;
            }
            /**
             * <code>required string name = 2;</code>
             */
            public Builder setNameBytes(
                com.google.protobuf.ByteString value) {
              
              
              if (value == null) {
              
              
          throw new NullPointerException();
        }
        bitField0_ |= 0x00000002;
              name_ = value;
              onChanged();
              return this;
            }
      
            private Object email_ = "";
            /**
             * <code>optional string email = 3;</code>
             */
            public boolean hasEmail() {
              
              
              return ((bitField0_ & 0x00000004) == 0x00000004);
            }
            /**
             * <code>optional string email = 3;</code>
             */
            public String getEmail() {
              
              
              Object ref = email_;
              if (!(ref instanceof String)) {
              
              
                com.google.protobuf.ByteString bs =
                    (com.google.protobuf.ByteString) ref;
                String s = bs.toStringUtf8();
                if (bs.isValidUtf8()) {
              
              
                  email_ = s;
                }
                return s;
              } else {
              
              
                return (String) ref;
              }
            }
            /**
             * <code>optional string email = 3;</code>
             */
            public com.google.protobuf.ByteString
                getEmailBytes() {
              
              
              Object ref = email_;
              if (ref instanceof String) {
              
              
                com.google.protobuf.ByteString b = 
                    com.google.protobuf.ByteString.copyFromUtf8(
                        (String) ref);
                email_ = b;
                return b;
              } else {
              
              
                return (com.google.protobuf.ByteString) ref;
              }
            }
            /**
             * <code>optional string email = 3;</code>
             */
            public Builder setEmail(
                String value) {
              
              
              if (value == null) {
              
              
          throw new NullPointerException();
        }
        bitField0_ |= 0x00000004;
              email_ = value;
              onChanged();
              return this;
            }
            /**
             * <code>optional string email = 3;</code>
             */
            public Builder clearEmail() {
              
              
              bitField0_ = (bitField0_ & ~0x00000004);
              email_ = getDefaultInstance().getEmail();
              onChanged();
              return this;
            }
            /**
             * <code>optional string email = 3;</code>
             */
            public Builder setEmailBytes(
                com.google.protobuf.ByteString value) {
              
              
              if (value == null) {
              
              
          throw new NullPointerException();
        }
        bitField0_ |= 0x00000004;
              email_ = value;
              onChanged();
              return this;
            }
      
            // @@protoc_insertion_point(builder_scope:netty.Person)
          }
      
          static {
              
              
            defaultInstance = new Person(true);
            defaultInstance.initFields();
          }
      
          // @@protoc_insertion_point(class_scope:netty.Person)
        }
      
        private static final com.google.protobuf.Descriptors.Descriptor
          internal_static_netty_Person_descriptor;
        private static
          com.google.protobuf.GeneratedMessage.FieldAccessorTable
            internal_static_netty_Person_fieldAccessorTable;
      
        public static com.google.protobuf.Descriptors.FileDescriptor
            getDescriptor() {
              
              
          return descriptor;
        }
        private static com.google.protobuf.Descriptors.FileDescriptor
            descriptor;
        static {
              
              
          String[] descriptorData = {
              
              
            "\n\014Person.proto\022\005netty\"1\n\006Person\022\n\n\002id\030\001 " +
            "\002(\005\022\014\n\004name\030\002 \002(\t\022\r\n\005email\030\003 \001(\tB:\n+cn.e" +
            "njoyedu.ch02.serializable.protobuf.demoB" +
            "\013PersonProto"
          };
          com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
              new com.google.protobuf.Descriptors.FileDescriptor.    InternalDescriptorAssigner() {
              
              
                public com.google.protobuf.ExtensionRegistry assignDescriptors(
                    com.google.protobuf.Descriptors.FileDescriptor root) {
              
              
                  descriptor = root;
                  return null;
                }
              };
          com.google.protobuf.Descriptors.FileDescriptor
            .internalBuildGeneratedFileFrom(descriptorData,
              new com.google.protobuf.Descriptors.FileDescriptor[] {
              
              
              }, assigner);
          internal_static_netty_Person_descriptor =
            getDescriptor().getMessageTypes().get(0);
          internal_static_netty_Person_fieldAccessorTable = new
            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
              internal_static_netty_Person_descriptor,
              new String[] {
              
               "Id", "Name", "Email", });
        }
      
        // @@protoc_insertion_point(outer_class_scope)
      }
      
      
    • 集成第三方MessagePack实战(LengthFieldBasedFrame详解)

Protobuf

添加pom引用

  • <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>2.6.1</version>
    </dependency>
    

客户端

ProtoBufClient

  • package cn.enjoyedu.nettybasic.serializable.protobuf;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufEncoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
    
    /**
     * @author Mark老师  
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class ProtoBufClient {
          
          
        public void connect(int port, String host) throws Exception {
          
          
            // 配置客户端NIO线程组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
          
          
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
          
          
                            @Override
                            public void initChannel(SocketChannel ch)
                                    throws Exception {
          
          
                                /*加一个消息长度,由netty自动计算*/
                                ch.pipeline().addLast(
                                        new ProtobufVarint32LengthFieldPrepender()
                                );
                                /*负责编码,序列化*/
                                ch.pipeline().addLast(new ProtobufEncoder());
                                ch.pipeline().addLast(new ProtoBufClientHandler());
                            }
                        });
    
                ChannelFuture f = b.connect(host, port).sync();
                f.channel().closeFuture().sync();
            } finally {
          
          
                // 优雅退出,释放NIO线程组
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
          
          
            int port = 8080;
            new ProtoBufClient().connect(port, "127.0.0.1");
        }
    }
    
    
  • netty内置的protobuf序列化协议的handler-----ProtobufEncoder

  • 解决粘包半包问题

    • ProtobufVarint32LengthFieldPrepender-----加一个消息长度,由netty自动计算

      第三种,加一个消息长度

ProtoBufClientHandler

  • package cn.enjoyedu.nettybasic.serializable.protobuf;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * @author Mark老师   
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class ProtoBufClientHandler extends ChannelInboundHandlerAdapter {
          
          
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
          
          
           System.out.println("Prepare to make data........");
            // 构建业务类的序列化类
           PersonProto.Person.Builder builder = PersonProto.Person.newBuilder();
            builder.setName("Mark");
            builder.setId(1);
            builder.setEmail("[email protected]");
            System.out.println("send data........");
            ctx.writeAndFlush(builder.build());
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          
          
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

服务端—反序列化

ProtoBufServer

  • package cn.enjoyedu.nettybasic.serializable.protobuf;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
    
    /**
     * @author Mark老师   
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class ProtoBufServer {
          
          
        public void bind(int port) throws Exception {
          
          
            // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
          
          
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
          
          
                            @Override
                            public void initChannel(SocketChannel ch) {
          
          
                                /*去除消息长度部分,同时根据这个消息长度读取实际的数据*/
                                // 去掉新增的消息头,方便后面正确解析消息体内容
                                ch.pipeline().addLast(
                                        new ProtobufVarint32FrameDecoder());
                                // 对Person做反序列化
                                ch.pipeline().addLast(new ProtobufDecoder(
                                        PersonProto.Person.getDefaultInstance()
                                ));
                                ch.pipeline().addLast(new ProtoBufServerHandler());
                    }
                });
    
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
    
                System.out.println("init start");
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
          
          
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
          
          
            int port = 8080;
            new ProtoBufServer().bind(port);
        }
    }
    
    
  • 去掉新增的消息头,方便后面正确解析消息体内容-----ProtobufVarint32FrameDecoder

  • 对Person做反序列化-----new ProtobufDecoder(
    PersonProto.Person.getDefaultInstance())

ProtoBufServerHandler

  • package cn.enjoyedu.nettybasic.serializable.protobuf;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.io.IOException;
    
    /**
     * @author Mark老师  
     * 往期课程和VIP课程咨询 依娜老师  QQ:2133576719
     * 类说明:
     */
    public class ProtoBufServerHandler  extends ChannelInboundHandlerAdapter {
          
          
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
          
          
            // 因为已经反序列化完毕
            // 所以直接强制转型即可
            PersonProto.Person req = (PersonProto.Person)msg;
            System.out.println("get data name = "+req.getName());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          
          
            if(cause instanceof IOException){
          
          
                System.out.println("远程客户端强迫关闭了一个现有的连接。");
            }
            ctx.close();
        }
    }
    
    

MessagePack

添加pom引用

  • <dependency>
        <groupId>org.msgpack</groupId>
        <artifactId>msgpack</artifactId>
        <version>0.6.12</version>
    </dependency>
    

MsgPackEncode-----MsgPack的编码器

  • package cn.enjoyedu.nettybasic.serializable.msgpack;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import org.msgpack.MessagePack;
    
    /*基于MessagePack的编码器,序列化*/
    public class MsgPackEncode extends MessageToByteEncoder<User> {
          
          
        @Override
        protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out)
                throws Exception {
          
          
            // 定义一个MessagePack实例
            MessagePack messagePack = new MessagePack();
            // 消息转字节就不用那么麻烦了,直接转成字节数组
            byte[] raw = messagePack.write(msg);
            // 直接把字节数组写到ByteBuf中去
            out.writeBytes(raw);
    
        }
    }
    
    

MsgPackDecoder-----MsgPack的解码器

  • package cn.enjoyedu.nettybasic.serializable.msgpack;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
    import org.msgpack.MessagePack;
    
    import java.util.List;
    
    /*基于MessagePack的解码器,反序列化*/
    public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
          
          
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
                throws Exception {
          
          
            // 首先拿到消息大小
            final int length = msg.readableBytes();
            // 定义一个字节数组
            final byte[] array = new byte[length];
            // 提取消息内容到字节数组中
            msg.getBytes(msg.readerIndex(),array,0,length);
            // 定义一个MessagePack实例
            MessagePack messagePack = new MessagePack();
            // messagePack.read(array,User.class)方法返回一个User的实例
            out.add(messagePack.read(array,User.class));
        }
    }
    
    

客户端

ClientMsgPackEcho

  • package cn.enjoyedu.nettybasic.serializable.msgpack;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.LineBasedFrameDecoder;
    
    import java.net.InetSocketAddress;
    
    /**
     * 作者:Mark/Maoke
     * 创建日期:2018/08/26
     * 类说明:
     */
    public class ClientMsgPackEcho {
          
          
    
        private final String host;
    
        public ClientMsgPackEcho(String host) {
          
          
            this.host = host;
        }
    
        public void start() throws InterruptedException {
          
          
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
            try {
          
          
                final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
                b.group(group)/*将线程组传入*/
                        .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
                        /*配置要连接服务器的ip地址和端口*/
                        .remoteAddress(
                                new InetSocketAddress(host, ServerMsgPackEcho.PORT))
                        .handler(new ChannelInitializerImp());
                ChannelFuture f = b.connect().sync();
                System.out.println("已连接到服务器.....");
                f.channel().closeFuture().sync();
            } finally {
          
          
                group.shutdownGracefully().sync();
            }
        }
    
        private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
          
          
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
          
          
                /*告诉netty,计算一下报文的长度,然后作为报文头加在前面*/
                // 这里是解决发送数据的粘包半包问题
                // MsgPack不想ProtoBuf有自动计算长度的handler,此处需要手动计算
                // LengthFieldPrepender用于计算报文长度
                // 用两个字节存放报文长度
                ch.pipeline().addLast(new LengthFieldPrepender(2));
                /*对服务器的应答也要解码,解决粘包半包*/
                // 这里是解决收到服务器端应答数据的粘包半包问题
                // 客户端针对服务端发送数据的回车换行符解码
                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    
                /*对我们要发送的数据做编码-序列化*/
                // MsgPack的序列化编码器
               ch.pipeline().addLast(new MsgPackEncode());
               ch.pipeline().addLast(new MsgPackClientHandler(5));
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
          
          
            new ClientMsgPackEcho("127.0.0.1").start();
        }
    }
    
    

MsgPackClientHandler

  • package cn.enjoyedu.nettybasic.serializable.msgpack;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 作者:Mark/Maoke
     * 创建日期:2018/08/26
     * 类说明:
     */
    public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
          
          
        private final int sendNumber;
    
        public MsgPackClientHandler(int sendNumber) {
          
          
            this.sendNumber = sendNumber;
        }
    
        private AtomicInteger counter = new AtomicInteger(0);
    
        /*** 客户端读取到网络数据后的处理*/
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
          
          
            System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                    +"] and the counter is:"+counter.incrementAndGet());
        }
    
        /*** 客户端被通知channel活跃后,做事*/
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
          
          
            User[] users = makeUsers();
            //发送数据
            for(User user:users){
          
          
                System.out.println("Send user:"+user);
                ctx.write(user);
            }
            ctx.flush();
        }
    
        /*** 发生异常后的处理*/
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          
          
            cause.printStackTrace();
            ctx.close();
        }
    
        /*生成用户实体类的数组,以供发送*/
        private User[] makeUsers(){
          
          
            User[] users=new User[sendNumber];
            User user =null;
            for(int i=0;i<sendNumber;i++){
          
          
                user=new User();
                user.setAge(i);
                String userName = "ABCDEFG --->"+i;
                user.setUserName(userName);
                user.setId("No:"+(sendNumber-i));
                user.setUserContact(
                        new UserContact(userName+"@xiangxue.com","133"));
                users[i]=user;
            }
            return users;
        }
    }
    
    

服务器

ServerMsgPackEcho

  • package cn.enjoyedu.nettybasic.serializable.msgpack;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    import java.net.InetSocketAddress;
    
    /**
     * 作者:Mark/Maoke
     * 创建日期:2018/08/25
     * 类说明:
     */
    public class ServerMsgPackEcho {
          
          
    
        public static final int PORT = 9995;
    
        public static void main(String[] args) throws InterruptedException {
          
          
            ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();
            System.out.println("服务器即将启动");
            serverMsgPackEcho.start();
        }
    
        public void start() throws InterruptedException {
          
          
            final MsgPackServerHandler serverHandler = new MsgPackServerHandler();
            EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
    
            try {
          
          
                ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
                b.group(group)/*将线程组传入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                    /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                    所以下面这段代码的作用就是为这个子channel增加handle*/
                    .childHandler(new ChannelInitializerImp());
                ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
                System.out.println("服务器启动完成,等待客户端的连接和数据.....");
                f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
            } finally {
          
          
                group.shutdownGracefully().sync();/*优雅关闭线程组*/
            }
        }
    
        private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
          
          
    
            @Override
            protected void initChannel(Channel ch) throws Exception {
          
          
                // 解决粘包半包问题
                // LengthFieldBasedFrameDecoder对消息长度进行解析
                // public LengthFieldBasedFrameDecoder(
                // int maxFrameLength,
                // int lengthFieldOffset, int lengthFieldLength,
                // int lengthAdjustment, int initialBytesToStrip)
                // maxFrameLength最大报文长度,超过会抛异常
                // lengthFieldOffset:长度域的偏移量,在报文里面移动多少后才是保存消息长度的位置,因为一开始就是,所以是0
                // lengthFieldLength:长度域的长度,此处的长度域是2位
                // lengthAdjustment:长度的一个修正值,修正什么?netty读到长度是12,则后面长度为12的数据都是需要读的
                // initialBytesToStrip:往后传的时候,丢弃多少字节
                
                // 假设发送的消息一共十二个字符,如下
                // Hello,_world
                // _表示空格
                // 消息头是00 0C,两个字节
                // 表示消息长度是12
                // 报文总长度是2+12 = 14
                
                
                // 特殊例子
                // CA FE 00 00 0C H E L L O , _ w o r l d 
                // (65535,2,3,0,0)
                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
                        0,2,0,
                        2));
                // MsgPack反序列化的解码器
                ch.pipeline().addLast(new MsgPackDecoder());
                ch.pipeline().addLast(new MsgPackServerHandler());
            }
        }
    
    }
    
    

MsgPackServerHandler

  • package cn.enjoyedu.nettybasic.serializable.msgpack;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 作者:Mark/Maoke
     * 创建日期:2018/08/25
     * 类说明:自己的业务处理
     */
    @ChannelHandler.Sharable
    public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {
          
          
    
        private AtomicInteger counter = new AtomicInteger(0);
    
        /*** 服务端读取到网络数据后的处理*/
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          
          
            //将上一个handler生成的数据强制转型
            User user = (User)msg;
            System.out.println("Server Accept["+user
                    +"] and the counter is:"+counter.incrementAndGet());
            //服务器的应答
            String resp = "I process user :"+user.getUserName()
                    + System.getProperty("line.separator");
            ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
            ctx.fireChannelRead(user);
        }
    
        /*** 发生异常后的处理*/
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          
          
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

猜你喜欢

转载自blog.csdn.net/Markland_l/article/details/113856342