Netty
一、Netty是什么?NIO同步非阻塞模型
Netty官网:https://netty.io
- Netty是由JBOSS提供的一个Java开源框架,现为Github上的独立项目;
- Netty是一个
异步
、基于事件驱动
的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序; - Netty主要针对在TCP协议下,面向Clients端的高并发应用;
- Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景;要透彻理解Netty,就要知道什么是NIO;
Netty作为异步高性能的通信框架,往往作为基础通信组件被RPC框架使用。(远程过程调用(RPC,Remote Procedure Call):RPC是一种进程间通信方式。目前业界主流的RPC框架包括阿里巴巴的Dubbo、谷歌开源的gRPC、Apache(前身Facebook)的Thrift、新浪微博的Montan等)
上一篇介绍了了NIO,那么为什么要出现Netty呢?也就是NIO的缺点
原生NIO存在的问题
- NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、 ServerSocketChannel、SocketChannel、ByteBuffer 等。
- 需要具备其他的额外技能:要熟悉Java 多线程编程,因为NIO编程涉及到Reactor
模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。 - 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
- JDK NIO的Bug:例如臭名昭著的Epoll Bug,它会导致Selector 空轮询,最终导致CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决。
Netty官网说明:
- Netty 是由JBOSS提供的一个Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络I0程序
- Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了NIO 的开发过程
- Netty 是目前最流行的戏行业、通信行业等获得了广泛的应用,知名的Elasticsearch、 Dubbo 框架内部都采用了Netty
Netty的优点
Netty对JDK自带的NIO的API进行了封装,解决了上述问题。
1)设计优雅:适用于各种传输类型的统一API 阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型- 单线程,一个或多个线程池
2)使用方便:详细记录的Javadoc, 用户指南和示例;没有其他依赖项,JDK 5 (Netty3.x)或6 (Netty4.x) 就足够了。
3) 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
4)安全:完整的SSL/TLS和StartTLS支持。
5)社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug 可以被及时修复,同时,更多的新功能会被加入
二、线程模型基本介绍
目前存在的线程模型有:
- 传统阻塞IO服务模型
- Reactor模型
根据Reactor的数量和处理资源池线程的数量不同,有3中典型的实现: - 单Reactor单线程
- 单Reactor多线程
- 主从Reactor多线程
Netty线程模式主要基于主从Reactor多线程模型做了一定改进,其中主从Reactor多线程模型有多个Reactor
.Reactor模型
针对传统IO模型的两个缺点的解决办法:
1)基于I/O复用模型:多个连接共用一个阻塞对象。应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接,当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
2) 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程池处理,一个线程可以处理多个连接的业务。
Reactor对应叫法:反应器模式、分发者模式、统治者模式
…Reactor模式的核心组成
Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件作出反应,它就像公司的电话接线员,他接听来自客户的电话并将路线转移到适当的联系人
Handlers:处理程序执行I/O事件要完成的实际事件,类似客户想要与之交谈的公司中的实际官员,Reactor筒通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
.单Reactor单线程
- 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在1个线程中完成
- 缺点:性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler 在处理某个连接.上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
- 使用场景:客户端的数量有限,业务处理非常快速,比如Redis在业务处理的时间复杂度0(1)的情况
.Reactor多线程
.Netty模型
Netty主要基于主从Reactors多线程模型做了一定的改进。
三、Netty快速入门实例
服务器端
package com.lsh.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author :LiuShihao
* @date :Created in 2020/9/21 4:46 下午
* @desc :
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup 和 WorkerGroup
//创建两个线程组
//BossGroup只处理连接请求,真正的和客户端业务处理,会交给workerGroup处理 两个都是无限循环
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务器端的启动对象,配置参数
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//使用NioSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接的状态
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
//创建一个通道测试对象 匿名对象
ch.pipeline().addLast(new NettyServerHandler());//给pipeline设置处理器
}
});//给workerGroup 的EventLoop 对应的g管道设置处理器
System.out.println("服务器 is Ok~");
ChannelFuture cf = serverBootstrap.bind(6666).sync();//绑定一个端口并且同步处理
//对 关闭通道 进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务器端处理器
package com.lsh.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author :LiuShihao
* @date :Created in 2020/9/21 5:08 下午
* @desc :
*
* 自定义一个Handler需要继承Netty规定的ChannelInboundHandlerAdapter
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 可以读取客户端发送的消息
* @param ctx 上下文对象: 管道pipeline,通道channel,地址
* @param msg 客户端发送的数据:默认是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx:"+ctx);
//将msg 转成一个 ByteBuf (Netty提供的,不是NIO的ByteBuffer)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送的消息时:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write和flish 将数据写入缓冲 并刷新
//一般来讲 我们对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello!客户端~",CharsetUtil.UTF_8));
}
/**
* 发生异常 一般是需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
package com.lsh.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author :LiuShihao
* @date :Created in 2020/9/22 9:35 上午
* @desc : 客户端需要一个事件循环组
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象 注意 创建的不是ServerBootstrap 而是BootStrap netty包下的
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道实现类
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClinetHandler());
}
});
System.out.println("客户端 is OK~");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
客户端处理器
package com.lsh.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author :LiuShihao
* @date :Created in 2020/9/22 9:50 上午
* @desc :
*/
public class NettyClinetHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪就会触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client:"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello Server~", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件 发生时 就会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}