目录
主从Reactor原理
1.单Reactor缺点
1.单Reactor需要响应连接和读写事件,单线程处理任务较多
2.单Reactor连接和读写事件放在一块处理,会互相影响,而且本身读写事件是一个比较
耗时的操作,当一个读写事件处理事件太长,那么势必会影响下一个连接事件的处理,影响
用户连接,这是个非常不友好的事情,很影响体验
2.主从Reactor原理
1.主从Reactor将会使用两个Reactor,主Reactor响应连接事件,从Reactor响应读写事件
2.主Reactor的selector只关注连接事件,接受到一个连接之后,将通道注册到从Reactor
的selector上去,这样连接和读写事件互不影响
3.这里会涉及到两个selector, 主Reactor线程扫描serverSocketChannel的连接事件,
接收到后交给Acceptor处理,Acceptor会得到一个socketChannel,然后注册到从
Reactor的selector上,从Reactor线程扫描处理所有socketChannel的读写事件
3.主从Reactor原理图
主从Reactor实现
代码实现演示
package com.example.nio.netty.mainsub;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
/**
* 主从单Reactor单线程模型
*/
class MainSubReactorModel {
private MainReactor mainReactor;
private SubReactor subReactor;
private Integer port;
public MainSubReactorModel(Integer port) throws IOException {
this.port = port;
this.subReactor=new SubReactor();
this.mainReactor=new MainReactor(this.port,this.subReactor);
}
public void start(){
//启动主Reactor
Thread thread = new Thread(this.mainReactor);
thread.start();
//启动从Reactor
Thread thread1 = new Thread(this.subReactor);
thread1.start();
}
}
/**
* 组件主Reactor(单独一个Selector处理连接事件)
*/
class MainReactor implements Runnable{
/**
* 端口
*/
private Integer prot;
/**
* 用于接受连接的ServerChannel
*/
private ServerSocketChannel serverSocketChannel;
/**
* 组合子Reactor
*/
private SubReactor subReactor;
private Selector selector;
public MainReactor(Integer prot,SubReactor subReactor) throws IOException {
this.prot = prot;
SelectorProvider provider = SelectorProvider.provider();
this.subReactor=subReactor;
this.subReactor.setSelector(provider.openSelector());
this.selector=provider.openSelector();
this.serverSocketChannel = provider.openServerSocketChannel();
//绑定端口
this.serverSocketChannel.bind(new InetSocketAddress(this.prot));
//设置为非阻塞
this.serverSocketChannel.configureBlocking(false);
//注册到选择器上面
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while (true){
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if(selectionKey.isAcceptable()) {
//处理连接
new Acceptor(serverSocketChannel, subReactor.getSelector()).run();
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 组件从Reactor(单独一个Selector处理读写事件)
*/
class SubReactor implements Runnable {
/**
* 管理连接的选择器
*/
private Selector selector;
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
while (true) {
try {
selector.selectNow();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//Handler处理读写事件
new Handler(selectionKey).run();
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* accepor(处理读写)
*/
class Acceptor implements Runnable{
/**
* 用于接受连接的ServerChannel
*/
private ServerSocketChannel serverSocketChannel;
/**
* 管理新连接的channel
*/
private Selector selector;
public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}
@Override
public void run() {
SocketChannel accept = null;
try {
accept = this.serverSocketChannel.accept();
//设置为非阻塞
accept.configureBlocking(false);
//注册到selector, 注册读写事件
accept.register(this.selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("新连接:"+accept.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
}
class Handler implements Runnable{
private SelectionKey selectionKey;
public Handler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
try {
if(selectionKey.isReadable()){
//读事件处理
this.read();
}else {
//写事件处理
this.write();
}
}catch (Exception e){
}
}
/**
* 处理读事件
* @throws IOException
*/
private void read() throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = channel.read(allocate);
if(read>0) {
System.out.println("接收到消息:" + new String(allocate.array(), 0,read));
}
}
/**
* 处理写事件
* @throws IOException
*/
private void write() throws IOException {
}
}