目录
什么是编解码器
- 每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区别是什么呢?
- 如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列-它的数据。 那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据(业务到网络),而解码器处理入站数据(网络到业务)。解决粘包半包也是编解码器框架的一部分。
解码器
- 将字节解码为消息——ByteToMessageDecoder;
- 将一种消息类型解码为另一种——MessageToMessageDecoder。
- 因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以 Netty 的解码器实现了 ChannelInboundHandler。
- 什么时候会用到解码器呢?每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。
将字节解码为消息
抽象类 ByteToMessageDecoder
- 将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 为它提供了一个抽象的基类:ByteToMessageDecoder。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。
重要方法
- decode(ChannelHandlerContext ctx,ByteBuf in,List< Object> out)
- 这是你必须实现的唯一抽象方法。decode()方法被调用时将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者该 ByteBuf 中没有更多可读取的字节时为止。然后,如果该 List 不为空,那么它的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。
将一种消息类型解码为另一种
MessageToMessageDecoder< T >
- T 代表源数据的类型
- 在两个消息格式之间进行转换(例如,从 String->Integer)
- decode(ChannelHandlerContext ctx,I msg,List< Object> out)
- 对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。
TooLongFrameException
- 由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。
- 为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个 TooLongFrameException(随后会被 ChannelHandler.exceptionCaught()方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你返回 一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。
编码器
将消息编码为字节
- MessageToByteEncoder< I >
- encode(ChannelHandlerContext ctx,I msg,ByteBuf out)
- encode()方法是需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为 ByteBuf 的出站消息(类型为 I 的)。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler。
将消息编码为消息
- MessageToMessageEncoder< T >,T 代表源数据的类型
- encode(ChannelHandlerContext ctx,I msg,List< Object > out)
- 这是需要实现的唯一方法。每个通过 write()方法写入的消息都将会被传递给 encode() 方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler。
编解码器类
- 抽象类 ByteToMessageCodec
- 抽象类 MessageToMessageCodec
- 这两个类都捆绑一个解码器/编码器对。这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口。
Netty 内置的编解码器和 ChannelHandler
- Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用。
通过 SSL/TLS 保护 Netty 应用程序
- SSL 和 TLS 这样的安全协议,它们层叠在其他协议之上,用以实现数据安全。我们在访问安全网站时遇到过这些协议,但是它们也可用于其他不是基于 HTTP 的应用程序,如安全 SMTP(SMTPS)邮件服务器甚至是关系型数据库系统。
- 为了支持 SSL/TLS,Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 类使得 实现解密和加密相当简单直接。Netty 通过一个名为 SslHandler 的 ChannelHandler 实现利用了这个 API,其中 SslHandler 在内部使用 SSLEngine 来完成实际的工作。
- 在大多数情况下,SslHandler 将是 ChannelPipeline 中的第一个 ChannelHandler。
HTTP 系列
- HTTP 是基于请求/响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会 返回一个 HTTP 响应。Netty 提供了多种编码器和解码器以简化对这个协议的使用。
- 一个HTTP 请求/响应可能由多个数据部分组成,FullHttpRequest 和FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest、 LastHttpContent 等等)都实现了 HttpObject 接口。
- HttpRequestEncoder 将 HttpRequest、HttpContent 和 LastHttpContent 消息编码为字节;
- HttpResponseEncoder 将 HttpResponse、HttpContent 和 LastHttpContent 消息编码为字节;
- HttpRequestDecoder 将字节解码为 HttpRequest、HttpContent 和 LastHttpContent 消息;
- HttpResponseDecoder 将字节解码为 HttpResponse、HttpContent 和 LastHttpContent 消息;
- HttpClientCodec 和 HttpServerCodec 则将请求和响应做了一个组合。
空闲的连接和超时
- 检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务, Netty 特地为它提供了几个 ChannelHandler 实现。
- IdleStateHandler 当连接空闲时间太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过在你的 ChannelInboundHandler 中重写 userEventTriggered()方法来处理该 IdleStateEvent 事件。
- ReadTimeoutHandler 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个 Read-TimeoutException 并关闭对应的 Channel。可以通过重写你的 ChannelHandler 中的 exceptionCaught()方法来检测该 Read-TimeoutException。
- WriteTimeoutHandler 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个 Write-TimeoutException 并关闭对应的 Channel 。可以通过重写你的 ChannelHandler 的 exceptionCaught()方法检测该 WriteTimeout-Exception。
模拟一个web容器
- pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.28.Final</version>
</dependency>
创建服务端HttpServer
- 编码和解码分别使用HttpResponseEncoder和HttpRequestDecoder
package org.example.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
public class HttpServer {
//设置服务端端口
public static final int port = 6789;
//通过nio方式来接收连接和处理连接
private static EventLoopGroup group = new NioEventLoopGroup();
//服务端必备
private static ServerBootstrap b = new ServerBootstrap();
//是否开启SSL模式(https)
private static final boolean SSL = true;
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
//设置签名证书
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
try {
b.group(group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(socketChannel.alloc()));
}
//入站,对请求报文解码,用于后续业务处理
pipeline.addLast("decode", new HttpRequestDecoder());
//出站,对应答报文编码
pipeline.addLast("encode", new HttpResponseEncoder());
//聚合http为一个完整的报文,最大不超过10M
pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
//对应答报文压缩
pipeline.addLast("compressor", new HttpContentCompressor());
//业务处理
pipeline.addLast(new BusiHandler());
}
});
//服务器绑定端口监听
ChannelFuture future = b.bind(port).sync();
System.out.println("服务端启动成功,端口是:" + port);
//监听服务器关闭监听
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
创建业务处理handler
package org.example.server;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.util.Date;
public class BusiHandler extends ChannelInboundHandlerAdapter {
//建立连接时,打印客户端地址
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String result = "";
//拿到请求对象
FullHttpRequest httpRequest = (FullHttpRequest) msg;
try {
//获取路径
String path = httpRequest.uri();
//获取body
String body = httpRequest.content().toString(CharsetUtil.UTF_8);
//获取请求方法
HttpMethod method = httpRequest.method();
System.out.println("接收到:" + method + " 请求");
//如果不是这个路径,就直接返回错误
if (!"/test".equalsIgnoreCase(path)) {
result = "非法请求!" + path;
//返回400
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
//这里只处理GET请求
if (HttpMethod.GET.equals(method)) {
//接受到的消息,做业务逻辑处理...
System.out.println("接收到的消息体:" + body);
result = "GET请求,应答:现在时间是" + new Date();
//返回200
send(ctx, result, HttpResponseStatus.OK);
return;
}
} finally {
//释放请求
httpRequest.release();
}
}
private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) {
//创建相应对象,设置http版本,状态码,应答内容
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(context, CharsetUtil.UTF_8));
//设置content-type格式
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
//写完数据就关闭chanel
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
启动服务端
- 浏览器发起请求,这里开启了https
- 使用http会请求失败
- 使用https,因为没有证书,会提示不安全
- 因为不是/test,提示非法请求
- https://127.0.0.1:6789/test
创建客户端HttpClient
package org.example.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
public class HttpClient {
public void connect(String host, int port) throws Exception {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
try {
b.group(workerGroup).channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//编解码器
socketChannel.pipeline().addLast(new HttpClientCodec());
//聚合http为一个完整的报文
socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(10*1024*1024));
//解压缩
socketChannel.pipeline().addLast("decompressor", new HttpContentDecompressor());
socketChannel.pipeline().addLast(new HttpClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
HttpClient client = new HttpClient();
client.connect("127.0.0.1", 6789);
}
}
创建客户端HttpClientHandler
package org.example.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.net.URI;
public class HttpClientHandler extends ChannelInboundHandlerAdapter {
//发送请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
URI uri = new URI("/test");
String msg = "Hello";
DefaultFullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(),
Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
// 构建http请求
request.headers().set(HttpHeaderNames.HOST, "127.0.0.1");
request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
// 发送http请求
ctx.writeAndFlush(request);
}
//读应答报文
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpResponse httpResponse = (FullHttpResponse)msg;
System.out.println(httpResponse.status());
System.out.println(httpResponse.headers());
ByteBuf buf = httpResponse.content();
System.out.println(buf.toString(CharsetUtil.UTF_8));
//释放ByteBuf,防止内存泄漏
httpResponse.release();
}
}
先启动服务端,再启动客户端
- 关闭https
- 客户端打印,接收到应答报文
- 服务端打印,接收get请求,具体消息内容