编解码器框架
什么是编解码器
- 数据在网络中传输,两端的操作系统类型是不一样的,对数据的解析方式也是不一样的,所以网络上传输的都是01串,都是字节
- 通讯两端就需要转换成01串,以及从01串转成真实数据
解码器
-
将字节解码为消息
-
将一种消息类型解码为另一种消息格式
- 比如https,需要先转换成密文,再解密成明文
-
ByteToMessageDecoder
-
decode是我们要实现的一个抽象方法
-
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
-
in是消息,out是下一个handler处理的数据
-
-
MessageToMessageDecoder
-
也是decode方法最重要
-
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
-
msg是上一个handler传过来的数据,out是传给下一个handler的数据
-
-
TooLongFrameException
-
因为解码需要将全部的数据进行解码,没办法分开解码,所以先收到的数据需要先保存起来,如果程序没有设计好,传送的数据过多,有可能撑爆
-
package cn.enjoyedu.nettybasic; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; import java.util.List; public class TooLongExSample extends ByteToMessageDecoder { private static final int MAX_SIZE = 1024; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 当前可以读的字节数有多少 int readable = in.readableBytes(); // 如果读取的字节数超过我们定义的阈值,则关闭通讯并抛出异常 if(readable>MAX_SIZE){ ctx.close(); throw new TooLongFrameException("传入的数据太多"); } } }
-
编码器
-
将消息编码为字节
-
MessageToByteEncoder
-
将消息编码为消息
-
MessageToMessageEncoder
编解码器类
- 编码和解码是同一种算法,对称加密
- ByteToMessageCodec
- 既有编码又有解码
- 但是编码时尽量分开,代码单一职责原则
Netty内置的编解码器和ChannelHandler
写一个web服务器demo----通过SSL/TLS 保护Netty 应用程序和HTTP系列
-
实现服务器
-
浏览器访问
-
用Netty实现客户端
-
增加压缩支持
-
Https支持
-
package cn.enjoyedu.nettyhttp.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class HttpServer { public static final int port = 6789; //设置服务端端口 private static EventLoopGroup group = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接 private static ServerBootstrap b = new ServerBootstrap(); private static final boolean SSL = false;/*是否开启SSL模式*/ /** * Netty创建全部都是实现自AbstractBootstrap。 * 客户端的是Bootstrap,服务端的则是ServerBootstrap。 **/ public static void main(String[] args) throws Exception { final SslContext sslCtx; if(SSL){ // 签名认证实例 SelfSignedCertificate ssc = new SelfSignedCertificate(); // 构建ssl上下文 sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); }else{ sslCtx = null; } try { b.group(group); b.channel(NioServerSocketChannel.class); // 把ssl上下文传给handler b.childHandler(new ServerHandlerInit(sslCtx)); //设置过滤器 // 服务器绑定端口监听 ChannelFuture f = b.bind(port).sync(); System.out.println("服务端启动成功,端口是:"+port); // 监听服务器关闭监听 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程 } } }
-
private static final boolean SSL = false; 判断是否开启ssl模式
-
当开启ssl模式后,访问http://127.0.0.1:6789/test
-
此时会显示该页面无法正常运作------代码抛了异常,因为传输的是明文,不是密文
-
改成https://127.0.0.1:6789/test
-
提示您的连接不是私密连接------这个证书需要去第三方服务器认证
-
服务器端
HttpServer
package cn.enjoyedu.nettyhttp.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* @author Mark老师
* 往期课程和VIP课程咨询 依娜老师 QQ:2133576719
* 类说明:
*/
public class HttpServer {
public static final int port = 6789; //设置服务端端口
private static EventLoopGroup group = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
private static ServerBootstrap b = new ServerBootstrap();
private static final boolean SSL = false;/*是否开启SSL模式*/
/**
* Netty创建全部都是实现自AbstractBootstrap。
* 客户端的是Bootstrap,服务端的则是ServerBootstrap。
**/
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if(SSL){
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(),
ssc.privateKey()).build();
}else{
sslCtx = null;
}
try {
b.group(group);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ServerHandlerInit(sslCtx)); //设置过滤器
// 服务器绑定端口监听
ChannelFuture f = b.bind(port).sync();
System.out.println("服务端启动成功,端口是:"+port);
// 监听服务器关闭监听
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程
}
}
}
ServerHandlerInit
package cn.enjoyedu.nettyhttp.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;
/**
* @author Mark老师
* 往期课程和VIP课程咨询 依娜老师 QQ:2133576719
* 类说明:
*/
public class ServerHandlerInit extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public ServerHandlerInit(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
// 如果ssl上下文不为null,开启ssl认证的handler
if(sslCtx!=null){
ph.addLast(sslCtx.newHandler(ch.alloc()));
}
/*把应答报文 编码*/
// http响应编码器
ph.addLast("encoder",new HttpResponseEncoder());
/*把请求报文 解码*/
// http请求解码器
ph.addLast("decoder",new HttpRequestDecoder());
/*聚合http为一个完整的报文*/
// 因为http的消息体可能是分块发送的,chunk
// 因此需要聚合成一个完整的报文,最大10M
ph.addLast("aggregator",
new HttpObjectAggregator(10*1024*1024));
/*把应答报文 压缩,非必要*/
// http的报文压缩,减轻带宽压力
ph.addLast("compressor",new HttpContentCompressor());
// 自己的业务handler
// 一定是一个入站handler
ph.addLast(new BusiHandler());
}
}
自己的业务handler–BusiHandler
-
package cn.enjoyedu.nettyhttp.server; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class BusiHandler extends ChannelInboundHandlerAdapter { /** * 发送的返回值 * @param ctx 返回 * @param context 消息 * @param status 状态 */ private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) { // http应答报文 // HttpVersion.HTTP_1_1 http1.1 // status http状态码 FullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1,status, Unpooled.copiedBuffer(context,CharsetUtil.UTF_8) ); // 消息头里面的其它定义内容 // 传输的文本类型----HttpHeaderNames.CONTENT_TYPE response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8"); // http1.1里面不开启keep-alive是需要服务器关闭连接的 // 加一个监听器ChannelFutureListener.CLOSE // 写完对端的动态执行完了,就执行监视器里面的关闭动作 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String result=""; // 把消息体强制转型成http请求类 FullHttpRequest httpRequest = (FullHttpRequest)msg; System.out.println(httpRequest.headers()); try{ //获取路径 String path=httpRequest.uri(); //获取body String body = httpRequest.content().toString(CharsetUtil.UTF_8); //获取请求方法 HttpMethod method=httpRequest.method(); System.out.println("接收到:"+method+" 请求"); //如果不是这个路径,就直接返回错误 if(!"/test".equalsIgnoreCase(path)){ result="非法请求!"+path; // 发送错误报文 send(ctx,result,HttpResponseStatus.BAD_REQUEST); return; } // 只实现GET方法 // 如果是GET请求 if(HttpMethod.GET.equals(method)){ //接受到的消息,做业务逻辑处理... System.out.println("body:"+body); result="GET请求,应答:"+RespConstant.getNews(); send(ctx,result,HttpResponseStatus.OK); return; } //如果是其他类型请求,如post if(HttpMethod.POST.equals(method)){ //接受到的消息,做业务逻辑处理... //.... return; } }catch(Exception e){ System.out.println("处理请求失败!"); e.printStackTrace(); }finally{ //释放请求 httpRequest.release(); } } /* * 建立连接时,返回消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress()); } }
消息体常量内容----RespConstant
-
package cn.enjoyedu.nettyhttp.server; import java.util.Random; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class RespConstant { private static final String[] NEWS = { "她那时候还太年轻,不知道所有命运赠送的礼物,早已在暗中标好了价格。——斯蒂芬·茨威格《断头皇后》", "这是一个最好的时代,也是一个最坏的时代;这是一个智慧的年代,这是一个愚蠢的年代;\n" + "这是一个信任的时期,这是一个怀疑的时期;这是一个光明的季节,这是一个黑暗的季节;\n" + "这是希望之春,这是失望之冬;人们面前应有尽有,人们面前一无所有;\n" + "人们正踏上天堂之路,人们正走向地狱之门。 —— 狄更斯《双城记》", }; private static final Random R = new Random(); public static String getNews(){ return NEWS[R.nextInt(NEWS.length)]; } }
客户端
HttpClient
-
package cn.enjoyedu.nettyhttp.client; import cn.enjoyedu.nettyhttp.server.HttpServer; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.*; import java.net.URI; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class HttpClient { public static final String HOST = "127.0.0.1"; private static final boolean SSL = false; public void connect(String host, int port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 编解码器,只用写一个就行了,http请求编码,http响应解码 ch.pipeline().addLast(new HttpClientCodec()); /*聚合http为一个完整的报文*/ // http报文聚合 ch.pipeline().addLast("aggregator", new HttpObjectAggregator(10*1024*1024)); /*解压缩*/ // 报文解压缩 ch.pipeline().addLast("decompressor", new HttpContentDecompressor()); // 自己的业务handler,发送http请求 ch.pipeline().addLast(new HttpClientInboundHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { HttpClient client = new HttpClient(); client.connect("127.0.0.1", HttpServer.port); } }
-
HttpClientCodec是一个编解码器,只用写一个就行了,http请求编码,http响应解码
HttpClientInboundHandler
-
package cn.enjoyedu.nettyhttp.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpResponse httpResponse = (FullHttpResponse)msg; System.out.println(httpResponse.status()); System.out.println(httpResponse.headers()); ByteBuf buf = httpResponse.content(); System.out.println(buf.toString(CharsetUtil.UTF_8)); // 自行处理了报文,必须释放buffer,不用后续handler处理了 // 为了避免内存泄漏,所以要释放buffer httpResponse.release(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { URI uri = new URI("/test"); String msg = "Hello"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8"))); // 构建http请求 request.headers().set(HttpHeaderNames.HOST, HttpClient.HOST); request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); // 发送http请求 ctx.writeAndFlush(request); } }
-
为什么是入站?
-
应为业务主要的处理内容是解析服务端传回的应答报文
netty处理空闲的连接和超时的handler
- tcp协议自己的keep-alive,时间太长了,一般是120分钟,所以往往应用层是自己建立心跳机制
- IdleStateHandler----空闲状态判断handler
- 读空闲handler — ReadTimeoutHandler
- 写空闲handler — WriteTimeoutHandler
序列化问题
为什么需要序列化
-
序列化的目的
- 将javaBean转化成二进制数据
-
Java序列化的缺点和业界主流的序列化框架:各种 Java 的序列化库的性能比较测试结果
-
为什么不用java的序列化?
1.只能在java中用
2.速度慢,字节长度长
-
测试
-
/* * Copyright 2013-2018 Lilinfeng. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package cn.enjoyedu.nettybasic.serializable.protogenesis; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明:测试序列化性能差异 */ public class PerformTestUserInfo { public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); int loop = 1000000; ByteArrayOutputStream bos = null; ObjectOutputStream os = null; long startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { bos = new ByteArrayOutputStream(); os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); bos.close(); } long endTime = System.currentTimeMillis(); System.out.println("The jdk serializable cost time is : " + (endTime - startTime) + " ms"); System.out.println("-------------------------------------"); ByteBuffer buffer = ByteBuffer.allocate(1024); startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { byte[] b = info.codeC(buffer); } endTime = System.currentTimeMillis(); System.out.println("The byte array serializable cost time is : " + (endTime - startTime) + " ms"); } }
-
-
很少用jdk序列化,java生态圈提供了很多序列化框架
-
https://yq.aliyun.com/articles/200607
-
例如dubbo中的hessin
-
一些序列化框架
java-built-in hessian kryo fast-serialization jboss-serialization jboss-marshalling-river protostuff msgpack-databind json/jackson/databind json/jackson/db-afterburner json/protostuff-runtime json/google-gson/databind json/svenson-databind json/flexjson/databind json/fastjson/databind smile/jackson/databind smile/jackson/db-afterburner bson/jackson/databind xml/xstream+c xml/jackson/databind-aalto
-
-
netty自己的序列化
-
内置有哪些?
1.Jboss-----Marshalling
2.google-----Protocol Buffers
下载谷歌序列化工具protoc.exe
写完实体类后,再写一个相关的描述文件,然后通过上面那个工具就可以生成一个类
// Generated by the protocol buffer compiler. DO NOT EDIT! // source: Person.proto package cn.enjoyedu.nettybasic.serializable.protobuf; public final class PersonProto { private PersonProto() { } public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } public interface PersonOrBuilder extends // @@protoc_insertion_point(interface_extends:netty.Person) com.google.protobuf.MessageOrBuilder { /** * <code>required int32 id = 1;</code> */ boolean hasId(); /** * <code>required int32 id = 1;</code> */ int getId(); /** * <code>required string name = 2;</code> */ boolean hasName(); /** * <code>required string name = 2;</code> */ String getName(); /** * <code>required string name = 2;</code> */ com.google.protobuf.ByteString getNameBytes(); /** * <code>optional string email = 3;</code> */ boolean hasEmail(); /** * <code>optional string email = 3;</code> */ String getEmail(); /** * <code>optional string email = 3;</code> */ com.google.protobuf.ByteString getEmailBytes(); } /** * Protobuf type {@code netty.Person} */ public static final class Person extends com.google.protobuf.GeneratedMessage implements // @@protoc_insertion_point(message_implements:netty.Person) PersonOrBuilder { // Use Person.newBuilder() to construct. private Person(com.google.protobuf.GeneratedMessage.Builder<?> builder) { super(builder); this.unknownFields = builder.getUnknownFields(); } private Person(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } private static final Person defaultInstance; public static Person getDefaultInstance() { return defaultInstance; } public Person getDefaultInstanceForType() { return defaultInstance; } private final com.google.protobuf.UnknownFieldSet unknownFields; @Override public final com.google.protobuf.UnknownFieldSet getUnknownFields() { return this.unknownFields; } private Person( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { boolean done = false; while (!done) { int tag = input.readTag(); switch (tag) { case 0: done = true; break; default: { if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) { done = true; } break; } case 8: { bitField0_ |= 0x00000001; id_ = input.readInt32(); break; } case 18: { com.google.protobuf.ByteString bs = input.readBytes(); bitField0_ |= 0x00000002; name_ = bs; break; } case 26: { com.google.protobuf.ByteString bs = input.readBytes(); bitField0_ |= 0x00000004; email_ = bs; break; } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { throw e.setUnfinishedMessage(this); } catch (java.io.IOException e) { throw new com.google.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { return PersonProto.internal_static_netty_Person_descriptor; } protected FieldAccessorTable internalGetFieldAccessorTable() { return PersonProto.internal_static_netty_Person_fieldAccessorTable .ensureFieldAccessorsInitialized( Person.class, Builder.class); } public static com.google.protobuf.Parser<Person> PARSER = new com.google.protobuf.AbstractParser<Person>() { public Person parsePartialFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return new Person(input, extensionRegistry); } }; @Override public com.google.protobuf.Parser<Person> getParserForType() { return PARSER; } private int bitField0_; public static final int ID_FIELD_NUMBER = 1; private int id_; /** * <code>required int32 id = 1;</code> */ public boolean hasId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * <code>required int32 id = 1;</code> */ public int getId() { return id_; } public static final int NAME_FIELD_NUMBER = 2; private Object name_; /** * <code>required string name = 2;</code> */ public boolean hasName() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * <code>required string name = 2;</code> */ public String getName() { Object ref = name_; if (ref instanceof String) { return (String) ref; } else { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { name_ = s; } return s; } } /** * <code>required string name = 2;</code> */ public com.google.protobuf.ByteString getNameBytes() { Object ref = name_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); name_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } public static final int EMAIL_FIELD_NUMBER = 3; private Object email_; /** * <code>optional string email = 3;</code> */ public boolean hasEmail() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** * <code>optional string email = 3;</code> */ public String getEmail() { Object ref = email_; if (ref instanceof String) { return (String) ref; } else { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { email_ = s; } return s; } } /** * <code>optional string email = 3;</code> */ public com.google.protobuf.ByteString getEmailBytes() { Object ref = email_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); email_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } private void initFields() { id_ = 0; name_ = ""; email_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; if (isInitialized == 1) return true; if (isInitialized == 0) return false; if (!hasId()) { memoizedIsInitialized = 0; return false; } if (!hasName()) { memoizedIsInitialized = 0; return false; } memoizedIsInitialized = 1; return true; } public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeInt32(1, id_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getNameBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, getEmailBytes()); } getUnknownFields().writeTo(output); } private int memoizedSerializedSize = -1; public int getSerializedSize() { int size = memoizedSerializedSize; if (size != -1) return size; size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(1, id_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, getNameBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, getEmailBytes()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; } private static final long serialVersionUID = 0L; @Override protected Object writeReplace() throws java.io.ObjectStreamException { return super.writeReplace(); } public static Person parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } public static Person parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } public static Person parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } public static Person parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } public static Person parseFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } public static Person parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } public static Person parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseDelimitedFrom(input); } public static Person parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseDelimitedFrom(input, extensionRegistry); } public static Person parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } public static Person parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } public static Builder newBuilder(Person prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } @Override protected Builder newBuilderForType( BuilderParent parent) { Builder builder = new Builder(parent); return builder; } /** * Protobuf type {@code netty.Person} */ public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder<Builder> implements // @@protoc_insertion_point(builder_implements:netty.Person) PersonOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { return PersonProto.internal_static_netty_Person_descriptor; } protected FieldAccessorTable internalGetFieldAccessorTable() { return PersonProto.internal_static_netty_Person_fieldAccessorTable .ensureFieldAccessorsInitialized( Person.class, Builder.class); } // Construct using PersonProto.Person.newBuilder() private Builder() { maybeForceBuilderInitialization(); } private Builder( BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { } } private static Builder create() { return new Builder(); } public Builder clear() { super.clear(); id_ = 0; bitField0_ = (bitField0_ & ~0x00000001); name_ = ""; bitField0_ = (bitField0_ & ~0x00000002); email_ = ""; bitField0_ = (bitField0_ & ~0x00000004); return this; } public Builder clone() { return create().mergeFrom(buildPartial()); } public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { return PersonProto.internal_static_netty_Person_descriptor; } public Person getDefaultInstanceForType() { return Person.getDefaultInstance(); } public Person build() { Person result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } return result; } public Person buildPartial() { Person result = new Person(this); int from_bitField0_ = bitField0_; int to_bitField0_ = 0; if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } result.id_ = id_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } result.name_ = name_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } result.email_ = email_; result.bitField0_ = to_bitField0_; onBuilt(); return result; } public Builder mergeFrom(com.google.protobuf.Message other) { if (other instanceof Person) { return mergeFrom((Person)other); } else { super.mergeFrom(other); return this; } } public Builder mergeFrom(Person other) { if (other == Person.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } if (other.hasName()) { bitField0_ |= 0x00000002; name_ = other.name_; onChanged(); } if (other.hasEmail()) { bitField0_ |= 0x00000004; email_ = other.email_; onChanged(); } this.mergeUnknownFields(other.getUnknownFields()); return this; } public final boolean isInitialized() { if (!hasId()) { return false; } if (!hasName()) { return false; } return true; } public Builder mergeFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { Person parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { parsedMessage = (Person) e.getUnfinishedMessage(); throw e; } finally { if (parsedMessage != null) { mergeFrom(parsedMessage); } } return this; } private int bitField0_; private int id_ ; /** * <code>required int32 id = 1;</code> */ public boolean hasId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * <code>required int32 id = 1;</code> */ public int getId() { return id_; } /** * <code>required int32 id = 1;</code> */ public Builder setId(int value) { bitField0_ |= 0x00000001; id_ = value; onChanged(); return this; } /** * <code>required int32 id = 1;</code> */ public Builder clearId() { bitField0_ = (bitField0_ & ~0x00000001); id_ = 0; onChanged(); return this; } private Object name_ = ""; /** * <code>required string name = 2;</code> */ public boolean hasName() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * <code>required string name = 2;</code> */ public String getName() { Object ref = name_; if (!(ref instanceof String)) { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { name_ = s; } return s; } else { return (String) ref; } } /** * <code>required string name = 2;</code> */ public com.google.protobuf.ByteString getNameBytes() { Object ref = name_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); name_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** * <code>required string name = 2;</code> */ public Builder setName( String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000002; name_ = value; onChanged(); return this; } /** * <code>required string name = 2;</code> */ public Builder clearName() { bitField0_ = (bitField0_ & ~0x00000002); name_ = getDefaultInstance().getName(); onChanged(); return this; } /** * <code>required string name = 2;</code> */ public Builder setNameBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000002; name_ = value; onChanged(); return this; } private Object email_ = ""; /** * <code>optional string email = 3;</code> */ public boolean hasEmail() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** * <code>optional string email = 3;</code> */ public String getEmail() { Object ref = email_; if (!(ref instanceof String)) { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { email_ = s; } return s; } else { return (String) ref; } } /** * <code>optional string email = 3;</code> */ public com.google.protobuf.ByteString getEmailBytes() { Object ref = email_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); email_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** * <code>optional string email = 3;</code> */ public Builder setEmail( String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; email_ = value; onChanged(); return this; } /** * <code>optional string email = 3;</code> */ public Builder clearEmail() { bitField0_ = (bitField0_ & ~0x00000004); email_ = getDefaultInstance().getEmail(); onChanged(); return this; } /** * <code>optional string email = 3;</code> */ public Builder setEmailBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; email_ = value; onChanged(); return this; } // @@protoc_insertion_point(builder_scope:netty.Person) } static { defaultInstance = new Person(true); defaultInstance.initFields(); } // @@protoc_insertion_point(class_scope:netty.Person) } private static final com.google.protobuf.Descriptors.Descriptor internal_static_netty_Person_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_netty_Person_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { return descriptor; } private static com.google.protobuf.Descriptors.FileDescriptor descriptor; static { String[] descriptorData = { "\n\014Person.proto\022\005netty\"1\n\006Person\022\n\n\002id\030\001 " + "\002(\005\022\014\n\004name\030\002 \002(\t\022\r\n\005email\030\003 \001(\tB:\n+cn.e" + "njoyedu.ch02.serializable.protobuf.demoB" + "\013PersonProto" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; return null; } }; com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] { }, assigner); internal_static_netty_Person_descriptor = getDescriptor().getMessageTypes().get(0); internal_static_netty_Person_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_netty_Person_descriptor, new String[] { "Id", "Name", "Email", }); } // @@protoc_insertion_point(outer_class_scope) }
-
集成第三方MessagePack实战(LengthFieldBasedFrame详解)
-
Protobuf
添加pom引用
-
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.6.1</version> </dependency>
客户端
ProtoBufClient
-
package cn.enjoyedu.nettybasic.serializable.protobuf; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class ProtoBufClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { /*加一个消息长度,由netty自动计算*/ ch.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender() ); /*负责编码,序列化*/ ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new ProtoBufClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new ProtoBufClient().connect(port, "127.0.0.1"); } }
-
netty内置的protobuf序列化协议的handler-----ProtobufEncoder
-
解决粘包半包问题
-
ProtobufVarint32LengthFieldPrepender-----加一个消息长度,由netty自动计算
第三种,加一个消息长度
-
ProtoBufClientHandler
-
package cn.enjoyedu.nettybasic.serializable.protobuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class ProtoBufClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Prepare to make data........"); // 构建业务类的序列化类 PersonProto.Person.Builder builder = PersonProto.Person.newBuilder(); builder.setName("Mark"); builder.setId(1); builder.setEmail("[email protected]"); System.out.println("send data........"); ctx.writeAndFlush(builder.build()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
服务端—反序列化
ProtoBufServer
-
package cn.enjoyedu.nettybasic.serializable.protobuf; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class ProtoBufServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { /*去除消息长度部分,同时根据这个消息长度读取实际的数据*/ // 去掉新增的消息头,方便后面正确解析消息体内容 ch.pipeline().addLast( new ProtobufVarint32FrameDecoder()); // 对Person做反序列化 ch.pipeline().addLast(new ProtobufDecoder( PersonProto.Person.getDefaultInstance() )); ch.pipeline().addLast(new ProtoBufServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); System.out.println("init start"); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new ProtoBufServer().bind(port); } }
-
去掉新增的消息头,方便后面正确解析消息体内容-----ProtobufVarint32FrameDecoder
-
对Person做反序列化-----new ProtobufDecoder(
PersonProto.Person.getDefaultInstance())
ProtoBufServerHandler
-
package cn.enjoyedu.nettybasic.serializable.protobuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.io.IOException; /** * @author Mark老师 * 往期课程和VIP课程咨询 依娜老师 QQ:2133576719 * 类说明: */ public class ProtoBufServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 因为已经反序列化完毕 // 所以直接强制转型即可 PersonProto.Person req = (PersonProto.Person)msg; System.out.println("get data name = "+req.getName()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if(cause instanceof IOException){ System.out.println("远程客户端强迫关闭了一个现有的连接。"); } ctx.close(); } }
MessagePack
添加pom引用
-
<dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency>
MsgPackEncode-----MsgPack的编码器
-
package cn.enjoyedu.nettybasic.serializable.msgpack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.msgpack.MessagePack; /*基于MessagePack的编码器,序列化*/ public class MsgPackEncode extends MessageToByteEncoder<User> { @Override protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception { // 定义一个MessagePack实例 MessagePack messagePack = new MessagePack(); // 消息转字节就不用那么麻烦了,直接转成字节数组 byte[] raw = messagePack.write(msg); // 直接把字节数组写到ByteBuf中去 out.writeBytes(raw); } }
MsgPackDecoder-----MsgPack的解码器
-
package cn.enjoyedu.nettybasic.serializable.msgpack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.msgpack.MessagePack; import java.util.List; /*基于MessagePack的解码器,反序列化*/ public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { // 首先拿到消息大小 final int length = msg.readableBytes(); // 定义一个字节数组 final byte[] array = new byte[length]; // 提取消息内容到字节数组中 msg.getBytes(msg.readerIndex(),array,0,length); // 定义一个MessagePack实例 MessagePack messagePack = new MessagePack(); // messagePack.read(array,User.class)方法返回一个User的实例 out.add(messagePack.read(array,User.class)); } }
客户端
ClientMsgPackEcho
-
package cn.enjoyedu.nettybasic.serializable.msgpack; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.LineBasedFrameDecoder; import java.net.InetSocketAddress; /** * 作者:Mark/Maoke * 创建日期:2018/08/26 * 类说明: */ public class ClientMsgPackEcho { private final String host; public ClientMsgPackEcho(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { final Bootstrap b = new Bootstrap();;/*客户端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/ /*配置要连接服务器的ip地址和端口*/ .remoteAddress( new InetSocketAddress(host, ServerMsgPackEcho.PORT)) .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已连接到服务器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { /*告诉netty,计算一下报文的长度,然后作为报文头加在前面*/ // 这里是解决发送数据的粘包半包问题 // MsgPack不想ProtoBuf有自动计算长度的handler,此处需要手动计算 // LengthFieldPrepender用于计算报文长度 // 用两个字节存放报文长度 ch.pipeline().addLast(new LengthFieldPrepender(2)); /*对服务器的应答也要解码,解决粘包半包*/ // 这里是解决收到服务器端应答数据的粘包半包问题 // 客户端针对服务端发送数据的回车换行符解码 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); /*对我们要发送的数据做编码-序列化*/ // MsgPack的序列化编码器 ch.pipeline().addLast(new MsgPackEncode()); ch.pipeline().addLast(new MsgPackClientHandler(5)); } } public static void main(String[] args) throws InterruptedException { new ClientMsgPackEcho("127.0.0.1").start(); } }
MsgPackClientHandler
-
package cn.enjoyedu.nettybasic.serializable.msgpack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import java.util.concurrent.atomic.AtomicInteger; /** * 作者:Mark/Maoke * 创建日期:2018/08/26 * 类说明: */ public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private final int sendNumber; public MsgPackClientHandler(int sendNumber) { this.sendNumber = sendNumber; } private AtomicInteger counter = new AtomicInteger(0); /*** 客户端读取到网络数据后的处理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { User[] users = makeUsers(); //发送数据 for(User user:users){ System.out.println("Send user:"+user); ctx.write(user); } ctx.flush(); } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /*生成用户实体类的数组,以供发送*/ private User[] makeUsers(){ User[] users=new User[sendNumber]; User user =null; for(int i=0;i<sendNumber;i++){ user=new User(); user.setAge(i); String userName = "ABCDEFG --->"+i; user.setUserName(userName); user.setId("No:"+(sendNumber-i)); user.setUserContact( new UserContact(userName+"@xiangxue.com","133")); users[i]=user; } return users; } }
服务器
ServerMsgPackEcho
-
package cn.enjoyedu.nettybasic.serializable.msgpack; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.net.InetSocketAddress; /** * 作者:Mark/Maoke * 创建日期:2018/08/25 * 类说明: */ public class ServerMsgPackEcho { public static final int PORT = 9995; public static void main(String[] args) throws InterruptedException { ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho(); System.out.println("服务器即将启动"); serverMsgPackEcho.start(); } public void start() throws InterruptedException { final MsgPackServerHandler serverHandler = new MsgPackServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的连接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { // 解决粘包半包问题 // LengthFieldBasedFrameDecoder对消息长度进行解析 // public LengthFieldBasedFrameDecoder( // int maxFrameLength, // int lengthFieldOffset, int lengthFieldLength, // int lengthAdjustment, int initialBytesToStrip) // maxFrameLength最大报文长度,超过会抛异常 // lengthFieldOffset:长度域的偏移量,在报文里面移动多少后才是保存消息长度的位置,因为一开始就是,所以是0 // lengthFieldLength:长度域的长度,此处的长度域是2位 // lengthAdjustment:长度的一个修正值,修正什么?netty读到长度是12,则后面长度为12的数据都是需要读的 // initialBytesToStrip:往后传的时候,丢弃多少字节 // 假设发送的消息一共十二个字符,如下 // Hello,_world // _表示空格 // 消息头是00 0C,两个字节 // 表示消息长度是12 // 报文总长度是2+12 = 14 // 特殊例子 // CA FE 00 00 0C H E L L O , _ w o r l d // (65535,2,3,0,0) ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0,2,0, 2)); // MsgPack反序列化的解码器 ch.pipeline().addLast(new MsgPackDecoder()); ch.pipeline().addLast(new MsgPackServerHandler()); } } }
MsgPackServerHandler
-
package cn.enjoyedu.nettybasic.serializable.msgpack; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; /** * 作者:Mark/Maoke * 创建日期:2018/08/25 * 类说明:自己的业务处理 */ @ChannelHandler.Sharable public class MsgPackServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); /*** 服务端读取到网络数据后的处理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将上一个handler生成的数据强制转型 User user = (User)msg; System.out.println("Server Accept["+user +"] and the counter is:"+counter.incrementAndGet()); //服务器的应答 String resp = "I process user :"+user.getUserName() + System.getProperty("line.separator"); ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); ctx.fireChannelRead(user); } /*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }