一、什么是Netty?
Netty是一个提供异步事件驱动(asynchronous event-driven)的网络应用框架,是一个用于快速开发高性能、可扩展协议的服务器和客户端。本质上,Netty就是一个NIO客户端服务器框架,我们可以用它快速简单地开发网络应用程序。Netty大大简化了网络程序开发过程。
二、如何使用Netty?
(一)、建立一个Maven项目
建议使用JDK1.6以上版本,这个步骤我就不举例了。假设我创建的项目名称是Netty。
(二)、导入maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
(三)、编写我们的第一个Discard Server服务器
世上最简单的协议不是’Hello, World!’ 而是 DISCARD(抛弃服务)。这个协议将会抛弃任何收到 的数据,而不响应。 为了实现 DISCARD 协议,你只需忽略所有收到的数据。让我们从 handler(处理器)的实现开始,handler 是由 Netty 生成用来处理 I/O 事件的。
代码如下:
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Class DiscardServerHandler ...
*
* @author LiJun
* Created on 2018/8/15
* 处理服务端 channel
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 丢弃收到的数据
((ByteBuf)msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
步骤如下:
创建一个类继承
ChannelInboundHandlerAdapter
,在这个例子上我们创建了一个DiscardServerHandler
继承ChannelInboundHandlerAdapter
类,ChannelInboundHandlerAdapter
这个类实现了ChannelInboundHandler
接口(这个接口中提供了多种事件处理的接口方法)。我们覆盖了
channelRead
方法,每当从客户端接收到新的数据的时候,这个方法就会被调用。实现DISCARD协议,处理器需要忽略掉收到的信息。ByteBuf 是一个引用计数 对象,这个对象必须显示地调用 release() 方法来释放。
来一个
channelRead()
方法的模板吧:@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
exceptionCaught()
事件处理方法是当出现Throwable
对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来 并且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不 同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
注:处理器的职责是释放所有传递到处理器的引用计数对象。
(四)、启动服务端的DiscardServerHandler
package com.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Class DiscardServer ...
*
* @author LiJun
* Created on 2018/8/15
*/
public class DiscardServer {
private int port;
public DiscardServer() {
}
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,并开始接收进来的连接
ChannelFuture channelFuture = b.bind(port).sync();
// 等待服务器,socket关闭
// 在这里,不会发生什么
channelFuture.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if(args.length > 0){
port = Integer.parseInt(args[0]);
}else {
port = 8080;
}
new DiscardServer(port).run();
}
}
解释:
NioEventLoopGroup
是用来处理I/O操作的多线程事件循环器,Netty 提供了许多不同的EventLoopGroup
的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用, 因此会有2个NioEventLoopGroup
会被使用。第一个经常被叫做‘boss’
,用来接收进来的连 接。第二个经常被叫做‘worker’
,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会 把连接信息注册到‘worker’
上。如何知道多少个线程已经被使用,如何映射到已经创建的Channel
上都需要依赖于EventLoopGroup
的实现,并且可以通过构造函数来配置他们的关系。ServerBootstrap
是一个启动 NIO 服务的辅助启动类。你可以在这个服务中直接使用Channel
,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。这里我们指定使用
NioServerSocketChannel
类来举例说明一个新的Channel
如何接收进来的连接。这里的事件处理类经常会被用来处理一个最近的已经接收的
Channel
。ChannelInitializer
是 一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel
。也许你想通过增加一些 处理类比如DiscardServerHandler
来配置一个新的Channel
或者其对应的ChannelPipeline
来实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到pipeline
上,然后提取这些匿名类到最顶层的类上。你可以设置这里指定的
Channel
实现的配置参数。我们正在写一个TCP/IP 的服务端,因此 我们被允许设置socket
的参数选项比如tcpNoDelay
和keepAlive
。你关注过
option()
和childOption()
吗?option()
是提供给NioServerSocketChannel
用来接 收进来的连接。childOption()
是提供给由父管道ServerChannel
接收到的连接,在这个例子 中也是NioServerSocketChannel
。我们继续,剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的 8080 端口。当然现在你可以多次调用 bind() 方法(基于不同绑定地址) .
(五)、测试
先运行main方法哦
为了我们便于观察到结果,
我们将channelRead()
方法内容改一下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
try{
while(in.isReadable()){
System.out.print((char) in.readByte());
System.out.flush();
}
}finally {
ReferenceCountUtil.release(msg);
}
}
1. 介绍下telnet
Telnet协议是TCP/IP协议族中的一员,是Internet远程登陆服务的标准协议和主要方式。它为用户提供了在本地计算机上完成远程主机工作的能力。在终端使用者的电脑上使用telnet程序,用它连接到服务器。终端使用者可以在telnet程序中输入命令,这些命令会在服务器上运行,就像直接在服务器的控制台上输入一样。可以在本地就能控制服务器。要开始一个telnet会话,必须输入用户名和密码来登录服务器。Telnet是常用的远程控制Web服务器的方法。
2. win10如何开启telnet
控制面板 -> 程序 -> 启用或关闭Windows功能 -> 找到Telnet客户端勾选上,然后点确定。
打开cmd,输入Telnet
我们可以输入?/help
查看帮助
3.连接
输入 open localhost 8080
显示正在连接localhost…就表示连接上了
当我们在telnet输入内容的时候就能在idea的控制台山看到响应的信息了。
三、利用Netty编写一个Echo(应答)服务器
把channelRead()
方法内容改成如下:
ctx.writeAndFlush(msg);
这个表示把传入的数据传回去,并显示。
writeAndFlush(msg);
等于 ctx.write(msg)+ctx.flush()
调用wirte
方法不会使消息写入到通道上,它被缓冲在了内部,我们必须要调用ctx.flush()
方法把缓冲区的数据强行输出。
四、利用Netty编写一个Time(时间) 服务器
TimeServerHandler类
如下:
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Class TimeServerHandler ...
*
* @author LiJun
* Created on 2018/8/15
*/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int)(System.currentTimeMillis()/1000L+220898800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
assert f == channelFuture;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
解释:
我们表示的是在连接被创建的时候发送一个消息,所以使用的是
channelActive()
方法channelActive()
方法将会在连接被建立并且准备进行通信时被调用。因此让我们在这个 法里完成一个代表当前时间的32位整数消息的构建工作。为了发送一个新的消息,我们需要分配一个包含这个消息的新的缓冲。因为我们需要写入一 个32位的整数,因此我们需要一个至少有4个字节的 ByteBuf。通过
ChannelHandlerContext.alloc()
得到一个当前的ByteBufAllocator
,然后分配一个新的缓冲。在这我们不需要
flip()
方法了,ByteBuf中没有这个方法,它含有两个指针,一个对应读操作,一个对应写操作。当我们想ByteBuf中写入数据的时候,写指针的索引就会增加,读指针不变,读的时候读指针变化,写指针不变。所以读指针索引和写指针所以分别代表了消息的开始和消息和结束。注意:
ChannelHandlerContext.write()
和writeAndFlush()
方法会返回一 个ChannelFuture
对象,一个ChannelFuture
代表了一个还没有发生的 I/O 操作。这意味着 任何一个请求操作都不会马上被执行,因为在 Netty 里所有的操作都是异步的。所以我们需要在write方法
返回的ChannelFuture
完成后调用close方法
,然后当他的写操 作已经完成他会通知他的监听者。请注意,close()
方法也可能不会立马关闭,他也会返回一个ChannelFuture
。当一个写请求已经完成是如何通知到我们?这个只需要简单地在返回的
ChannelFuture
上 增加一个ChannelFutureListener
。这里我们构建了一个匿名的ChannelFutureListener
类用来 在操作完成时关闭Channel
。 或者,你可以使用简单的预定义监听器代码:f.addListener(ChannelFutureListener.CLOSE);
为了测试我们的time服务如我们期望的一样工作,你可以使用 UNIX 的 rdate 命令
$ rdate -o <port> -p <host>
Port 是你在main()函数中指定的端口,host 使用 localhost 就可以了。
五、时间客户端
在Netty中,编写服务端和客户端唯一也是最大的不同就是使用了不同的BootStrap
和Channel
的实现。
先看一下代码吧:
TimeClient
package com.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Class TimeClient ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeClient {
public static void main(String[] args) {
//String host = args[0];
//int port = Integer.parseInt(args[1]);
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
//启动客户端
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
解释下:
BootStrap
和ServerBootStrap
类似,但是它针对的是非服务端的channel
,例如客户端或者无连接传输模式的channel
。- 指定了
EventLoopGroup
,那么这既是一个boss group
,也是worker group
,尽管客户端不需要boss worker
NioSocketChannel
这个类在客户端channel
被创建时使用。- 不需要像使用
ServerBootStrap
一样使用childOption()
方法,因为客户端的SocketChannel
没有父亲。 我们用
connect()
方法代替了bind()方法。最后我们需要一个
TimeClientHandler
类来处理信息,翻译时间。
TimeClientHandler
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
/**
* Class TimeClientHandler ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
try {
long currentTimeMillis = (byteBuf.readUnsignedInt() - 2208988800L)*1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}finally {
byteBuf.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
解释下:
在TCP/IP中,Netty会把读到的数据放到ByteBuf这个数据结构中。
启动时间服务器:
TimeServer
package com.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Class TimeServer ...
*
* @author LiJun
* Created on 2018/8/15
*/
public class TimeServer{
private int port;
public TimeServer() {
}
public TimeServer(int port) {
this.port = port;
}
public void run() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,并开始接收进来的连接
ChannelFuture channelFuture = b.bind(port).sync();
// 等待服务器,socket关闭
// 在这里,不会发生什么
channelFuture.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if(args.length > 0){
port = Integer.parseInt(args[0]);
}else {
port = 8080;
}
new TimeServer(port).run();
}
}
六、处理一个基于流的传输
注意事项:基于流的传输(例如TCP/IP)接收到的数据是存在于socket接收的buffer中的,而且这个基于流的传输不是数据包队列,而是字节队列,所以不能保证远程写入数据的准确读取。由于这个原因,接收部分需要将接收的数据进行碎片整理,整理成一个或多个可以被应用逻辑轻松识别的有意义框架。
例如:
- 远程传输的数据为 ABC DEF GHI
- 没处理接收到的数据就可能为 AB CDEFG H I 因为是字节,数据错乱了
- 我们需要将数据处理成 ABC DEF GHI 这种能被识别的数据。
由此:
(一)、解决方法一
以TIME客户端为例。32位整型是非常小的数据,可能会被拆分到不同的数据段内,并且,碎片化的可能性会随着流量的增加而增加。所以我们可以构造一个内部的可积累的缓冲,知道4个字节全部接收到了内部缓冲区。
修改后的TimeClientHandler:
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
/**
* Class TimeClientHandler2 ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {
private ByteBuf byteBuf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
byteBuf = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
byteBuf.release();
byteBuf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf m = (ByteBuf)msg;
byteBuf.writeBytes(m);
m.release();
if(byteBuf.readableBytes() >= 4){
long currentTimeMills = (byteBuf.readUnsignedInt() - 2208988800L)*1000L;
System.out.println(new Date(currentTimeMills));
ctx.close()
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
解释下:
ChannelHandler
有两个生命周期的监听方法:handlerAdded()和handlerRemoved()
。我们可以完成任意初始化。- 所有接收的数据都被积累在了
byteBuf
变量中。 - 处理器需要检查byteBuf照片那个是否有足够数据,如果有,则进行逻辑处理,否则就重复调用
channelRead()
方法获得更多数据,知道数据够了为止。
(二)、解决方法二
上面的方法维护起来麻烦。我们可以吧ChannelHandler
拆分成多个模块以减少应用的复杂度。
我们可以把上面的TimeClientHandler
拆分成;两个处理器:
TimeDecoder
处理数据拆分的问题TimeClientHandler
原始版本的实现。
这就要用到Netty提供的可扩展类了:ByteToMessageDecoder
TimeDecoder
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* Class TimeDecoder ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes() < 4){
return;
}
list.add(byteBuf.readBytes(4));
}
}
解释下:
- ByteToMessageDecoder是
ChannelInboundHandler
的· 一个实现可,可以吧处理数据拆分的问题变得简单。 - 每当有新数据接收的时候,ByteToMessageDecoder都会调用
decode()
方法来处理内部的累积缓冲byteBuf
。 decode()
方法可以决定累积缓冲里有没有足够数据可以往out对象里放任意数据。当有更多的数据被接收了,ByteToMessageDecoder会再次调用decode()方法。- 如果在
decode()
方法里增加了一个对象到 out 对象里,这意味着解码器解码消息成功。 ByteToMessageDecoder 将会丢弃在累积缓冲里已经被读过的数据。我们不需要对多条消息调用 decode(),ByteToMessageDecoder 会持续调用 decode() 直到不放任何数据到 out 里。
修改TImeClient的ChannelInitializer实现:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
这还有一个解码器:ReplayingDecoder,需要去了解API获取更多信息。
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
/**
* Class TimeDecoder2 ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(byteBuf.readBytes(4));
}
}
Netty还提供了不少开箱即用的解码器
io.netty.example.factorial
基于二进制协议io.netty.example.telnet
基于文本协议
七、用POJO代替ByteBuf
在此之前我们都是使用ByteBu作为协议消息的数据结构。我们接下来就要使用POJO代替ByteBuf。
这样做的好处:
- 通过从ChannelHandler中提取出ByteBuf的代码,使ChannelHandler的实现变得更加可维护和可重用。
我们改善我们的时间客户端:
首先我们定义一个新的类:UnixTime
package com.netty;
import java.util.Date;
/**
* Class UnixTime ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis()/1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long getValue() {
return value;
}
@Override
public String toString() {
return new Date((getValue() - 2208988800L)*1000L).toString();
}
}
修改TimeDecoder类,返回UnixTime:
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* Class TimeDecoder3 ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeDecoder3 extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes() < 4){
return;
}
list.add(new UnixTime(byteBuf.readUnsignedInt()));
}
}
修改TimeClientHandler继承的channelRead()方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UnixTime unixTime = (UnixTime)msg;
System.out.println(unixTime);
ctx.close();
}
同样我们服务端也可以改:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelFuture future = ctx.writeAndFlush(new UnixTime());
future.addListener(ChannelFutureListener.CLOSE);
}
现在我们就编写一个编码器实现ChannelOutboundHandler,, 用来将UnixTime对象转化为一个ByteBuf:
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
/**
* Class TimeEncoder ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
UnixTime unixTime = (UnixTime)msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)unixTime.getValue());
ctx.write(encoded, promise);
}
}
注意下:
- 通过ChannelPromise,当编码后的数据被写到通道上Netty可以通过这个对象标记成功还是失败。
- 我们不需要调用ctx.flush(),因为处理器单独分离除了一个void flush()方法了。
如果要进一步简化代码的话:
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Class TimeEncoder ...
*
* @author LiJun
* Created on 2018/8/27
*/
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, UnixTime unixTime, ByteBuf byteBuf) throws Exception {
byteBuf.writeInt((int)unixTime.getValue());
}
}
最后,我们需要在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。
八、关闭我们的应用
关闭一个 Netty 应用往往只需要简单地通过 shutdownGracefully() 方法来关闭你构建的所有的EventLoopGroup。当EventLoopGroup 被完全地终止,并且对应的所有 channel 都已经被关闭时,Netty 会返回一个Future对象来通知你。