高性能异步IO Reactor模式
原始网络编程
最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:
while(true){
//阻塞等待客户端连接
socket = accept();
handle(socket)
}
这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。
之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:
while(true){
socket = accept();
new thread(socket);
}
tomcat服务器的早期版本确实是这样实现的。多线程的方式确实一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。最开始对这句话很不理解,线程中创建多个socket不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。
- 缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。
- 线程池本身可以缓解线程创建-销毁的代价,这样优化确实会好很多,不过还是存在一些问题的,就是线程的粒度太大。
- 每一个线程把一次交互的事情全部做了,包括读取和返回,甚至连接,表面上似乎连接不在线程里,但是如果线程不够,有了新的连接,也无法得到处理,所以,目前的方案线程里可以看成要做三件事,连接,读取和写入。
- 线程同步的粒度太大了,限制了吞吐量。应该把一次连接的操作分为更细的粒度或者过程,这些更细的粒度是更小的线程。整个线程池的数目会翻倍,但是线程更简单,任务更加单一。
- 这其实就是Reactor出现的原因,在Reactor中,这些被拆分的小线程或者子过程对应的是handler,每一种handler会出处理一种event。这里会有一个全局的管理者selector,我们需要把channel注册感兴趣的事件,那么这个selector就会不断在channel上检测是否有该类型的事件发生,如果没有,那么主线程就会被阻塞,否则就会调用相应的事件处理函数即handler来处理。
- 典型的事件有连接,读取和写入,当然我们就需要为这些事件分别提供处理器,每一个处理器可以采用线程的方式实现。一个连接来了,显示被读取线程或者handler处理了,然后再执行写入,那么之前的读取就可以被后面的请求复用,吞吐量就提高了。
I/O多路复用
I/O多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。
一般情况下,I/O 复用机制需要事件分发器。 事件分发器的作用,将那些读写事件源分发给各读写事件的处理者。
涉及到事件分发器的两种模式称为:Reactor和Proactor。 Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。本文主要介绍的就是 Reactor模式相关的知识。
经典的I/O服务设计 — BIO模式
执行流程
- 服务器端的Server是一个线程,线程中执行一个死循环来阻塞的监听客户端的连接请求和通信。
- 当客户端向服务器端发送一个连接请求后,服务器端的Server会接受客户端的请求,ServerSocket.accept()从阻塞中返回,返回一个与客户端连接的Socket。
- 构建一个handler,将Socket传入该handler。创建一个线程并启动该线程,在线程中执行handler,这样与客户端的所有的通信以及数据处理都在该线程中执行。当该客户端和服务器端完成通信关闭连接后,线程就会被销毁。
- 然后Server继续执行accept()操作等待新的连接请求。
每个处理程序可以在自己的线程中启动,这种模型由于IO在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。
server导致阻塞的原因:
-
serversocket的accept方法,阻塞等待client连接,直到client连接成功。
-
线程从socket inputstream读入数据,会进入阻塞状态,直到全部数据读完。
-
线程向socket outputstream写入数据,会阻塞直到全部数据写完。
Reactor模式
Reactor模式(反应器模式)是一种处理一个或多个客户端并发交付服务请求的事件设计模式。当请求抵达后,服务处理程序使用I/O多路复用策略,然后同步地派发这些请求至相关的请求处理程序。
Reactor结构
Reactor模式的角色
Reactor模式一共有5中角色构成
- Handle(句柄或描述符,在Windows下称为句柄,在Linux下称为描述符):本质上表示一种资源(比如说文件描述符,或是针对网络编程中的socket描述符),是由操作系统提供的。该资源用于表示一个个的事件,事件既可以来自于外部,也可以来自于内部。外部事件比如说客户端的连接请求,客户端发送过来的数据等。内部事件比如说操作系统产生的定时事件等。它本质上就是一个文件描述符,Handle是事件产生的发源地。
- Synchronous Event Demultiplexer(同步事件分离器):它本身是一个系统调用,用于等待事件的发生(事件可能是一个,也可能是多个)。调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于Linux来说,同步事件分离器指的就是常用的I/O多路复用机制,比如说select、poll、epoll等。在Java NIO领域中,同步事件分离器对应的组件就是Selector;对应的阻塞方法就是select方法。它会通知Handle。
- Event Handler(事件处理器):本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制。在Java NIO领域中并没有提供事件处理器机制让我们调用或去进行回调,是由我们自己编写代码完成的。Netty相比于Java NIO来说,在事件处理器这个角色上进行了一个升级,它为我们开发者提供了大量的回调方法,供我们在特定事件产生时实现相应的回调方法进行业务逻辑的处理,即,ChannelHandler。ChannelHandler中的方法对应的都是一个个事件的回调。它拥有Handle。
- Concrete Event Handler(具体事件处理器):是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。
- Initiation Dispatcher(初始分发器):实际上就是Reactor角色。它本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设施。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件。Netty中ChannelHandler里的一个个回调方法都是由bossGroup或workGroup中的某个EventLoop来调用的。
Reactor模式流程
- 当应用向Initiation Dispatcher注册具体的事件处理器时,应用会标识出,该事件处理器希望Initiation Dispatcher在某个事件发生时向其通知的该事件,该事件与Handle关联。事件通过Handle来标识,而Concrete Event Handler又持有该Handle。这样,事件 ——> Handle ——> Concrete Event Handler 就关联起来了。
- Initiation Dispatcher会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器。
- 当所有的事件处理器注册完毕后,应用会调用handle_events(type)方法来启动Initiation Dispatcher的事件循环。这时Initiation Dispatcher会将每个注册的事件管理器的Handle合并起来,并使用同步事件分离器等待这些事件的发生。比如说,TCP协议层会使用select同步事件分离器操作来等待客户端发送的数据到达连接的socket handle上。
- 当与某个事件源对应的Handle变为ready状态时(比如说,TCP socket变为等待读状态时),同步事件分离器就会通知Initiation Dispatcher
- Initiation Dispatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的Handle。当事件发生时,Initiation Dispatcher会将被事件源激活的Handle作为 [key] 来寻找并分发恰当的事件处理器回调方法。
- Initiation Dispatcher会回调事件处理器的handle_events(type)回调方法来执行特定于应用的功能(开发者自己所编写的功能),从而响应这个事件。所发生的事件类型可以做为该方法参数并被该方法内部使用来执行额外的特定于服务的分离与分发。
完整的事件处理流程
执行Initiation Dispatcher启动之后将Event Handler(Concrete Event Handler)注册到其上注册的同时注册感兴趣的事件。通过Handle来去标识的它是被Event Handler所拥有的(NIO中对应的就是SelectionKey)。当有感兴趣的事件在于Event Handler所关联的Handle上面产生的时候由Initiation Dispatcher去通知(调用)注册到其上的具体的Concrete Event Handler(注册Event Handler并标识感兴趣的事件,后续当感兴趣的事件触发时,由 Initiation Dispatcher去通知/调用具体的事件处理器)。
当Event Handler(事件处理器)注册到Initiation Dispatcher之后/注册完之后,紧接着Initiation Dispatcher就开启了它自己的事件循环,是一个死循环和Netty中的NioEventLoopGroup一样。在这个事件循环当中它会通过Synchronous Event Demultiplexer(同步事件分离器)来去等待事件的发生,当一个事件发生后同步事件分离器就会获取到这个产生的事件的集合返回给Initiation Dispatcher。然后Initiation Dispatcher会选择与这个事件对应的一系列的事件处理器,然后遍历事件处理器根据事件的类型来去调用注册到其上的Concrete Event Handler里面的handle_event()方法(这个方法本身是由Initiation Dispatcher中的handle_events()方法调用的)。至此为止整个的一个循环就结束了
Handler 注册方式和调用时机、以及被谁调用
线程循环启动之前就已经会将若干个Event Handler给注册到Initiation Dispatcher对象上去,通过register_handler(h)这方法h是携带的Handle属性。然后线程循环不断的去运行。当Synchronous Event Demultiplexer(事件分离器)
产生事件之后Initiation Dispatcher就会得到事件分离器里面的所有事件,它会根据特定的事件遍历出来针对于这个事件/对这个事件感兴趣的的handlers把它给选择出来。然后去遍历handlers(事件处理器列表)里面的每个handler(事件处理器)。然后调用handler的handle_events(type)。
Reactor模式的实现方式
单线程Reactor模式
执行流程
- 服务器端的Reactor是一个线程对象,该线程会启动事件循环,并使用Selector来实现IO的多路复用。注册一个Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。
- 客户端向服务器端发起一个连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ事件以及对应的READ事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ事件了。或者当你需要向客户端发送数据时,就向Reactor注册该连接的WRITE事件和其处理器。
- 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理。比如,读处理器会通过SocketChannel的read()方法读取数据,此时read()操作可以直接读取到数据,而不会堵塞与等待可读的数据到来。
- 每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理。
说明
注意,Reactor的单线程模式的单线程主要是针对于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。
但在目前的单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应。所以我们应该将非s/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O请求的响应。
伪代码示例
//实现Runnable,让它自己成为一个线程
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
//构造方法 参数:port 端口号
Reactor(int port) throws IOException {
//使用Selector.open()方法创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocket = ServerSocketChannel.open();
//将ServerSocketChannel绑定到一个特定的地址/端口上
serverSocket.socket().bind(new InetSocketAddress(port));
//配置为非阻塞的
serverSocket.configureBlocking(false);
//将channel注册到selector上,并设置感兴趣的事件为OP_ACCEPT(准备接收连接的事件)
SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
//attach附加任意一个对象一次只能携带一个,后面的会覆盖前面的,当事件发生的时候可以通过attachment()获取
sk.attach(new Acceptor());
}
/*
上面的创建方式也可以使用下面这种实现:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/
//-----------------Reactor的run()方法 调度回路---------------------
// run()方法,通常这是在一个新的线程里执行的
public void run() {
try {
while (!Thread.interrupted()) {
//阻塞方法,当有事件发生的时候,返回事件的数量
selector.select();
//获取SelectionKey集合,事件集合
Set selected = selector.selectedKeys();
//获得集合迭代器
Iterator it = selected.iterator();
//遍历集合
while (it.hasNext())
//进行派发
dispatch((SelectionKey)(it.next());
//处理完之后将事件集合清空
selected.clear();
}
}catch (IOException ex){ /* ... */ }
}
//--------------------------dispatch类------------------------------
void dispatch(SelectionKey k) {
//获取之前添加的附件,然后向上类型转换
Runnable r = (Runnable)(k.attachment());
if (r != null)
//如果不为空,调用线程的run()方法
r.run();
}
//------------------------Acceptor类 适配器------------------------
class Acceptor implements Runnable {
public void run() {
try {
//accept()接受这个连接,返回SocketChannel对象
SocketChannel c = serverSocket.accept();
if (c != null)
//如果不为空,创建一个Handler并传入Selector与SocketChannel然后调用Handler,Handler就是我们真正要处理的对象
new Handler(selector, c);
}catch(IOException ex) { /* ... */ }
}
}
//----------------------------Handler类 处理程序设置-----------------
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
//构造方法
Handler(Selector sel, SocketChannel c) throws IOException {
//赋值给成员变量
socket = c;
//设置为非阻塞的
c.configureBlocking(false);
// 现在可以选择先读
//将这个SocketChannel注册到Selector上,感兴趣的事件是0在后面通过interestOps()方法设置
sk = socket.register(sel, 0);
//然后将当前对象作为附件
sk.attach(this);
//设置感兴趣的事件,为读事件
sk.interestOps(SelectionKey.OP_READ);
//如果Selector的select()方法一直在阻塞的时候,让它立刻返回
sel.wakeup();
}
//--------------Handler的 run()方法 请求的处理------------------
public void run() {
try {
//这种判断的代码,类似Netty3的代码,将读/写根据一个状态值进行标识然后通过if else进行判断。
if (state == READING)
//去读
read();
else if (state == SENDING)
//去写
send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// 读完之后,将感兴趣的事件设置为写
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
}
//状态处理器
//GoF状态对象模式的简单使用将适当的处理程序重新绑定为附件
class Handler {
// 初始状态是reader
public void run() {
//读取channel的数据到input
socket.read(input);
if (inputIsComplete()) {
process();
//重新绑定恰当的Handler作为attachment
sk.attach(new Sender());
//设置新的感兴趣的事件为写事件
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){
// ...
//进行相应的写操作
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
}
说明
Client首先连接到一个Reactor(在Netty当中是EventLoop)线程对象当中,这个线程本身不做任何事情,只接收对端的连接请求。连接请求一旦建立之后它就将相应的处理给派发(dispatch)给特定的处理器(Handler)上,由处理器根据特定的事件来进行相应的方法调用
多线程的设计,Multithreaded Designs
- 策略的增加了一些线程来实现可伸缩性
- 主要应用在多处理器当中,多核处理器
- Workder Threads(对应Netty的workderGroup)
- Reactor应该快速的触发处理器,处理器的处理速度会减慢Reactor,建议将非IO的操作移交给其它线程完成
- 多Reactor线程
- Reactor线程可以进行IO的处理,将负载转移给其它的Reactor。实现负载的平衡实现CPU和IO速率让它们匹配
Workder Threads
- 将非IO的处理移交出去,来去加快Reactor线程
- 要比重新的绑定计算处理绑定到事件驱动的形式更简单
- 依然应该是纯粹的非阻塞的计算,足够的处理来去解决这个过载的问题
- 要比IO重叠处理更加困难
- 在什么时候做,最好是把所有的输入都读到一个buffer当中之后。
- 使用线程池因此可以进行线程池的调节和控制
- 通常需要比客户端少的线程
工作线程池 改进
与单线程Reactor模式不同的是,添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理。
使用线程池的优势
- 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程产生的巨大开销。
- 另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
- 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态。同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
注意
在上图的改进的版本中,所以的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。
对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下:
- 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送;
- 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
伪代码示例
//具有线程池的处理程序
class Handler implements Runnable{
// 创建一个线程池对象
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// 读取
synchronized void read(){
// ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
//真正的读取执行扔到线程池里,在线程池里进行完成
pool.execute(new Processer());
}
}
synchronized void processAndHandOff(){
process();
//处理完之后将状态改成SENDING
state = SENDING;
//将感兴趣的事件改为写时间
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable{
public void run() {
processAndHandOff();
}
}
}
让这些任务进行一个协同
- Handoffs
- 每一个任务都可以去启用触发或者是调用下一个,通常是最快的但是这么做可能会很脆弱
- Callbacks 针对每个处理器分发器的回调
- 设置状态,attachment等等...,GoF Mediator模式的变体
- Queues
- 在各个阶段之间传递缓冲区
- Futures
- 每一个任务都会生成一个结果,它可以通过基于 join、wait/notify方式进行协同
多Reactor线程模式
Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的事件循环逻辑。
mainReactor可以只有一个,但subReactor一般会有多个。mainReactor线程主要负责接收客户端的连接请求,然后将接收到的SocketChannel传递给subReactor,由subReactor来完成和客户端的通信。
流程
- 注册一个Acceptor事件处理器到mainReactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样mainReactor会监听客户端向服务器端发起的连接请求事件(ACCEPT事件)。启动mainReactor的事件循环。
- 客户端向服务器端发起一个连接请求,mainReactor监听到了该ACCEPT事件并将该ACCEPT事件派发给Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将这个SocketChannel传递给subReactor线程池。
- subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事件以及对应的READ事件处理器注册到subReactor线程中。当然你也可以注册WRITE事件以及WRITE事件处理器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有自己的Selector、线程和分发的循环逻辑。
- 当有I/O事件就绪时,相关的subReactor就将事件派发给响应的处理器处理。注意,这里subReactor线程只负责完成I/O的read()操作,在读取到数据后将业务逻辑的处理放入到线程池中完成,若完成业务逻辑后需要返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。
Netty 与 Reactor模式
Netty的线程模式就是一个实现了Reactor模式的经典模式。
结构对应
NioEventLoop | Initiation Dispatcher |
---|---|
Selector | Synchronous EventDemultiplexer |
ChannelHandler | ChannelHandler |
具体的ChannelHandler的实现 | oncreteEventHandler |
模式对应
Netty服务端使用了“多Reactor线程模式”,Netty默认情况下会有多个subReactor和mainReactor
bossGroup(NioEventLoopGroup) 中的某个NioEventLoop | mainReactor |
---|---|
workerGroup(NioEventLoopGroup) 中的某个NioEventLoop | subReactor |
ServerBootstrapAcceptor | acceptor |
用户自定义线程池 | ThreadPool |
流程
- 当服务器程序启动时,会配置ChannelPipeline,ChannelPipeline中是一个ChannelHandler链(实际上是一个一个的ChannelHandlerContext),所有的事件发生时都会触发Channelhandler中的某个方法,这个事件会在ChannelPipeline中的ChannelHandler链里传播。然后,从bossGroup事件循环池中获取一个NioEventLoop来现实服务端程序绑定本地端口的操作,将对应的ServerSocketChannel注册到该NioEventLoop中的Selector上,并注册ACCEPT事件为ServerSocketChannel所感兴趣的事件。
- NioEventLoop事件循环启动,此时开始监听客户端的连接请求。
- 当有客户端向服务器端发起连接请求时,NioEventLoop的事件循环监听到该ACCEPT事件,Netty底层会接收这个连接,通过accept()方法得到与这个客户端的连接(SocketChannel),然后触发ChannelRead事件(即,ChannelHandler中的channelRead0方法会得到回调),该事件会在ChannelPipeline中的ChannelHandler链中执行、传播。
- ServerBootstrapAcceptor的readChannel方法会将该SocketChannel(客户端的连接)注册到workerGroup(NioEventLoopGroup) 中的某个NioEventLoop的Selector上,并注册READ事件为SocketChannel所感兴趣的事件。启动SocketChannel所在NioEventLoop的事件循环,接下来就可以开始客户端和服务器端的通信了。