1.线程模型基本介绍
不同的线程模型,对程序的性能有很大影响,为了搞清Netty线程模型,我们系统讲解下各个线程模型,最后看看Netty线程模型有什么优越性。
目前存在的线程模型有:传统阻塞I/O服务模型、Reactor模型
Reactor模型根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现
-
单Reactor单线程
-
单Reactor多线程
-
主从Reactor多线程
Netty线程模型(Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型中有多个Reactor)
2.传统阻塞I/O服务模型
工作原理图
黄色的框表示对象、蓝色的框表示线程、白色的框表示方法
模型特点
1)采用阻塞IO模型获取输入的数据
2)每个连接都需要独立的线程完成数据的输入、业务处理、数据返回
问题分析
1)当并发数很大,就会创建大量的线程,占用大量的系统资源
2)连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费
3.Reactor模型
针对传统阻塞I/O服务模型的2个缺点,解决方案:
1)基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
2)基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
基本设计思想
I/O复用结合线程池,就是Reactor模式基本设计思想。如下图:
Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。
即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch给某进程)。
核心组成
Rector模式中核心组成:
Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件作出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移给使用的联系人
Handler:处理程序执行IO事件实际处理器,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作
Reactor模式分类
Reactor模式分类:
根据Reactor的数量和处理资源池线程的数量不同,有3种实现
1)单Reactor单线程
2)单Reactor多线程
3)主从Reactor多线程
4.单Reactor单线程
工作原理示意图
方案说明
1)Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发;
2)如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理;
3)如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应;
4)Handler 会完成 Read→业务处理→Send 的完整业务流程。
优缺点分析
服务器端用一个线程通过多路复用搞定了所有的IO操作(包括连接,读、写等),编码简单、清晰明了。但是如果客户端连接数量较多,将无法支撑。
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。
缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
可靠性问题,线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis,业务处理的时间复杂度 O(1)。
5.单Reactor多线程
工作原理示意图
方案说明
1)Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发;
2)如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后续的各种事件;
3)如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应;
4)Handler 只负责响应事件,不做具体业务处理,通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
5)Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理;
6)Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
优缺点分析
优点:可以充分利用多核 CPU 的处理能力。
缺点:多线程数据共享和访问比较复杂;Reactor 承担所有事件的监听和响应,在单线程中运行,高并发场景下容易成为性能瓶颈。
6.主从Reactor多线程
工作原理示意图
针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。
方案说明
1)Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor 接收,处理建立连接事件;
2)Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理;
3)SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件;
4)当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应;
5)Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
6)Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理;
7)Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
优缺点分析
优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持。
缺点:编程复杂度较高
7.Reactor模式小结
3 种模式可以用个比喻来理解:(餐厅常常雇佣接待员负责迎接顾客,当顾客入坐后,侍应生专门为这张桌子服务)
1)单 Reactor 单线程,接待员和侍应生是同一个人,全程为顾客服务;
2)单 Reactor 多线程,1 个接待员,多个侍应生,接待员只负责接待;
3)主从 Reactor 多线程,多个接待员,多个侍应生。
Reactor 模式具有如下的优点:
1)响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
4)可复用性,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性。
8.Netty模型
工作原理示意图1-简单版
Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型中有多个Reactor
工作原理示意图2-进阶版
https://www.jianshu.com/p/38b56531565d(这篇文章写的非常好!)
工作原理示意图3-详细版
Netty抽象出两组线程池,BossGroup专门负责客户端的连接,workGroup专门负责网络的读写。
BossGroup和WorkerGroup类型都是NIOEventLoopGroup。
NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop。
NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop包含1个selector,用于监听绑定在其上的SocketChannel的事件。
每个Boss NioEventLoop循环执行的任务包含3步:
1)轮询accept事件
2)处理accept I/O事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上
3)处理任务队列中的任务,runAllTasks。
任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。
每个Worker NioEventLoop循环执行的任务包含3步:
1)轮询read、write事件;
2)处I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理
3)处理任务队列中的任务,runAllTasks。
Netty快速入门实例——TCP服务
要求:
Netty服务器在6668端口监听,客户端发送消息给服务器“helllo,服务器”,服务器
可以回复消息给客户端“hello,客户端”
目的:对Netty线程模型有一个初步认识,便于理解Netty模型理论
服务器端
package com.thelight1.netty.simple; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws Exception { /* 1.创建两个线程组bossGroup,workerGroup 2.bossGroup只处理连接请求,真正和和客户端进行业务处理,交由workder只处理 3.两个都是无限循环 4.默认含有的NioEventLoop个数为cup核数*2 */ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //设置线程队列等待连接的个数 .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { //想pipeline添加一个处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new NettyServerHandler()); } }); System.out.println("服务器 is ready"); //绑定一个端口并且同步,生成一个channelFuture对象 //启动服务器,并绑定端口 ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.thelight1.netty.simple; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据事件(这里我们可以读取客户端发送的消息) /* 1.ChannelHandlerContext ctx:上下文对象 2.Object msg:就是客户端发送的数据,默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程:" + Thread.currentThread().getName()); System.out.println("server ctx: " + ctx); ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址是:" + ctx.channel().remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8)); } //处理异常,一般需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); } }
客户端
package com.thelight1.netty.simple; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("客户端 is ok"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
package com.thelight1.netty.simple; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪就会触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client ctx:" + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server~喵", CharsetUtil.UTF_8)); } //当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); } }
几个问题
问题1:服务端启动时,NioEventLoopGroup中会有多少个EventLoop
答案:
当不传参数时,默认会有cpu核数*2个,如下workerGroup有16个EventLoop
问题2:当客户端建立连接时,是如何分配到workerGroup中的EventLoop的
默认是轮训的,我们把workderGroup的构建参数改为4后(NioEventLoopGroup workerGroup = new NioEventLoopGroup(4))
启动7个客户端后,可以看到处理线程依次为
nioEventLoopGroup-3-1
nioEventLoopGroup-3-2
nioEventLoopGroup-3-3
nioEventLoopGroup-3-4
nioEventLoopGroup-3-1
nioEventLoopGroup-3-2
nioEventLoopGroup-3-3
任务队列中的Task有3种典型使用场景
1)用户程序自定义的普通任务
2)用户自定义定时任务
3)非当前Reactor线程调用Channel的各种方法(无代码示例)
例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景,最终的write会提交到任务队列中被异步消费
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据事件(这里我们可以读取客户端发送的消息) /* 1.ChannelHandlerContext ctx:上下文对象 2.Object msg:就是客户端发送的数据,默认Object */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { //用户自定义普通任务 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10 * 1000); System.out.println("当前异步运行任务线程2:" + Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端2~", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } } }); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); System.out.println("当前异步运行任务线程3:" + Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端3~", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } } }); //用户自定义定时任务 ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); System.out.println("当前异步运行任务线程4:" + Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端4~", CharsetUtil.UTF_8)); } catch (InterruptedException e) { e.printStackTrace(); } } }, 5, TimeUnit.SECONDS); System.out.println("服务器读取线程:" + Thread.currentThread().getName()); System.out.println("server ctx: " + ctx); ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址是:" + ctx.channel().remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("当前线程:" + Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8)); } //处理异常,一般需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); } }
可以看到虽然任务被加到任务队列中,但是运行时并不是异步执行的,
任务运行时,仍然是在原来的nioEventLoopGroup-3-1线程中运行的,且多个任务之间仍旧是串行执行的。
方案再说明
1)Netty抽象出两组线程池,bossGroup专门负责接收客户端连接,workerGroup专门负责网络读写操作
2)NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道
3)NioEventLoop内部采用串行化设计,从消息的读取—>解码—>处理—>编码—>发送,始终由IO线程NioEventLoop负责
-
NioEventLoopGroup下包含多个NioEventLoop
-
每个NioEventLoop中包含一个Selector,一个taskQueue
-
每个NioEventLoop的Selector上可以注册监听多个NioChannel
-
每个NioChannel只会绑定在唯一的NioEventLoop上
-
每个NioChannel都绑定有一个自己的ChannelPipeline
9.异步模型
基本介绍
异步的概念和同步相对,当一个异步过程调用发出后,调用者不能立刻得到结果,实际处理这个调用的组件在完成后,通过回调来通知调用者。
Netty中的I/O操作是异步的,包括bind、write、connect等操作会简单的返回一个ChannelFuture调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便地主动获取或者通过通知机制获得IO操作结果。
Netty的异步模型是建立在future和callback之上的。future的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适,那么可以在调用fun的时候,立马返回一个future,后续可以通过future去监控fun的处理过程(即future-listener机制)
Future说明
表示异步的执行结果,可以通过它提供的方法来检测执行是否完成。
ChannelFuture是一个接口,我们可以添加监听器,当监听的事件发生时,就会通知到监听器
工作原理示意图
在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或利用future即可,这使得链式操作简单、高效,并有利于编写可重用的代码。
Netty框架的目的就是让你的业务逻辑从网络基础应用编码中分离、解脱出来
Future-Listener机制
举例说明:
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("服务器绑定端口 6668 成功"); } else { System.out.println("服务器绑定端口 6668 失败"); } } });
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
10.快速入门实例——HTTP服务
要求:Netty服务器在6888端口监听,浏览器发出请求“http://localhost:6668/"。服务器可以回复消息给客户端“hello,我是服务器”,并对特定请求资源进行过滤。
目的:Netty可以做http服务开发,并且理解Handler实例和客户端及其请求的关系。
package com.thelight1.netty.http; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TestServer { public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new TestServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(16668).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.thelight1.netty.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class TestServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("myHttpServerCodec",new HttpServerCodec()); pipeline.addLast("myTestHttpServerHandler", new TestHttpServerHandler()); } }
package com.thelight1.netty.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { System.out.println("msg类型:" + msg.getClass()); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); HttpRequest httpRequest = (HttpRequest) msg; URI uri = new URI(httpRequest.uri()); if ("/favicon.ico".equals(uri.getPath())) { System.out.println("请求了/favicon.ico,不做响应"); return; } ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); ctx.writeAndFlush(response); } } }