Netty设计模式与源码分析
大名鼎鼎的Reactor模式
Reactor模式的组成元素
channel和selector属于 java.nio 包中的类,分别为网络通信中的通道(连接)和选择器。
Reactor和 handler 属于Reactor模型高性能编程中的应用程序角色,分别为反应器和处理器。
Reactor模式的三步曲
第一步:注册
将channel 通道的就绪事件,注册到选择器Selector。监听新的连接。
第二步:轮询
轮询的代码,是Reactor重要的一个组成部分,或者说核心的部分。轮询选择器是否有就绪事件。将就绪事件注册到子选择器上面,并注册读事件。
第三步:分发
将io读写事件,分发到事件附件的处理器handler中,由handler完成实际的处理。
Netty中的Reactor模式
模型解释:
-
Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
-
BossGroup和WorkerGroup类型都是NioEventLoopGroup
-
NioEventLoopGroup 相当于一个事件循环线程组, 这个组中含有多个事件循环线程 , 每一个事件循环线程是NioEventLoop
-
每个NioEventLoop都有一个selector , 用于监听注册在其上socketChannel的网络通讯
-
每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
- 处理accept事件 , 与client 建立连接
- 生成 NioSocketChannel将NioSocketChannel注册到某个worker NIOEventLoop上的selector
- 处理任务队列的任务 , 即runAllTasks
-
每个worker NIOEventLoop线程循环执行的步骤
- 轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
- 处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
- runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在 pipeline 中的流动处理
-
每个worker NIOEventLoop处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler 处理器用来处理 channel 中的数据
EventLoopGroup、EventLoop
我们先来分析 Netty 中的线程池。在Netty中EventLoopGroup就相当与一个线程池,里面中的EventLoop就相当于每一个线程。我们最先开始将这两段源码是如何实现的。
先来看下event的继承关系及主要属性
代码最开始
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 参数最全的构造方法
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类的构造方法
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
nThreads:NioEventLoopGroup中NioEventLoop 的实例数量。
executor:NioEventLoop 内部使用的线程池。NioEventLoopGroup中所有NioEventLoop公用一个NioEventLoop。
chooserFactory:我们如何选择NioEventLoopGroup中NioEventLoop,需要根据这个策略工厂生成一个具体的策略,类似一个一个排队获取。
selectorProvider:这个简单,我们需要通过它来实例化 Selector,可以看到每个线程池都持有一个 selectorProvider 实例。JDK方法。
selectStrategyFactory:这个涉及到的是线程池中线程的工作流程,在介绍 NioEventLoop 的时候会说。先预先说一下,如果队列有值直接seletorNew。
rejectedExecutionHandler:拒绝策略
跟着源码一步一步走
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
// nThreads 里面存的线程个数
// executor 线程池 现在传入的是null
// SelectorProvider.provider() jdk底层的提供者对象 主要用于生成selector
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 第四个参数为策略工厂 只有一个实例 在NioEventLoop 的run方法中会用到这个策略
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
//对面满了的拒绝策略
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//DEFAULT_EVENT_LOOP_THREADS 如果线程是为0则是cpu核心数*2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
//DefaultEventExecutorChooserFactory.INSTANCE eventLoop 选择器
// 就是说如果 EventLoopGroup 里面有很多的 EventLoop 时,当有一个新的连接时
// 到底选择哪一个 EventLoop 呢
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
我们先看下DefaultEventExecutorChooserFactory.INSTANCE的实现
//选择下一个EventLoop
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//线程数是否是2的倍数
//如果是2的倍数 有一套优化的策略
//如果不是2的倍数 用另一套策略
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
/**
* 用于确定数字是否是2的倍数
* @param val
* @return
*/
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
两个策略的区别,如果是2的n次方用效率更新的算法
PowerOfTwoEventExecutorChooser
@Override
public EventExecutor next() {
//用的位运算
return executors[idx.getAndIncrement() & executors.length - 1];
}
GenericEventExecutorChooser
@Override
public EventExecutor next() {
//用的取模运算
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
继续回到主线,MultithreadEventExecutorGroup这个方法是最重要的方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//如果具体执行的线程池为null 需要为EventLoopGroup 赋值一个线程池对象 用于具体的执行线程
//里面有execute方法 主要用于执行线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventLoopGroup 中 有一个 children 属性 是一个数组 数据中存的就是EventLoop
//现在是初始化 还没有对数组进行赋值
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//此处 开始对数据进行赋值
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
*********略************
} finally {
**********略*************
}
}
//将刚刚咱们分析的选择赋值给this属性 就是EnentLoopGroup对象
chooser = chooserFactory.newChooser(children);
**********略*************
}
这个方法中最重要的方法是
children[i] = newChild(executor, args);
最终调用NioEventLoopGroup.newChild方法
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// this 就是 NioEventLoopGroup
// executor 具体执行的线程池,里面有个一execute(runnable)方法 具体执行线程
// (SelectorProvider) args[0] jdk中提供者对象 主要用于生成selector
// ((SelectStrategyFactory) args[1]).newSelectStrategy() 测类 主要用于用selector 还是 selectorNew
// (RejectedExecutionHandler) args[2] 拒绝策略 队列满了后的拒绝策略
// queueFactory 队列创建工厂 此时我们传的值是null
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
NioEventLoop的构造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 此处有一些netty的优化逻辑 我们不去细讲 因为 主要逻辑就是为了生成jdk的selector
// 并赋值到EventLoop中的selector属性
final SelectorTuple selectorTuple = openSelector();
//生成jdk 的 selector**
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
继续调用父类的构造方法
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
//进入父类
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
//忽略他
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
最终调用父类构造方法,对以下几个重要的属性进行赋值
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
//将NioEventLoopGroup 对象赋值给parent属性上
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//此前我们的线程池赋值给每个EventLoop对象上
//用来开启 NioEventLoop 中的线程(Thread 实例)
this.executor = ThreadExecutorMap.apply(executor, this);
//生成一个任务队列,很一些任务先存入队列中 然后再队列中取值 上面的线程执行
//提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行
//它是任务队列。我们前面说过,NioEventLoop 需要负责 IO 事件和非 IO 事件,通常它都在执行 selector
//的 select 方法或者正在处理 selectedKeys,如果我们要 submit 一个任务给它,任务就会被放到 taskQueue 中,等它来轮询
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
//队列满的情况 拒绝策略
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
总结
- 在 Netty 中,NioEventLoopGroup 代表线程池,NioEventLoop 就是其中的线程。
- 线程池 NioEventLoopGroup 是池中的线程 NioEventLoop 的 parent,从上面的代码中的取名可以看出。
- 每个 NioEventLoop 都有自己的 Selector。
- executor、selectStrategy 和 rejectedExecutionHandler 从 NioEventLoopGroup 中一路传到了 NioEventLoop 中。
Netty 中的 Future、Promise
Netty 中有非常多的异步调用
我们看这段代码,之后会分析里面的逻辑,我们先看返回值是一个ChannelFuture类型,关于 Future 接口,jdk已经提供了。但是有很多缺陷。例如没有回调函数、如果等待完成还是阻塞的。jdk1.8虽然已经优化,但再1.8出来之前guava已经帮我们优化了Future 接口,同样Netty中也对Future 进行了优化操作。
ChannelFuture f = b.bind(PORT).sync();
Future
Netty 中的 Future 接口继承了 JDK 中的 Future 接口,然后添加了一些方法(举例几个重要方法):
public interface Future<V> extends java.util.concurrent.Future<V> {
// 是否成功
boolean isSuccess();
// 是否可取消
boolean isCancellable();
// 如果任务执行失败,这个方法返回异常信息
Throwable cause();
// 添加 Listener 来进行回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞等待任务结束,如果任务失败,将“导致失败的异常”重新抛出来
Future<V> sync() throws InterruptedException;
// 阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常
Future<V> await() throws InterruptedException;
}
所以刚刚上面提到的监听方法,他是一个异步的方法,调用sync方法阻塞等待注册成功后回调。当然你也可以自己添加回调方法。
ChannelFuture
我们再看ChannelFuture类,主要是把channel赋值进来。
public interface ChannelFuture extends Future<Void> {
// ChannelFuture 关联的 Channel
Channel channel();
// 覆写以下几个方法,使得它们返回值为 ChannelFuture 类型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
}
Promise 接口
增加了几个执行成功或者失败的方法
public interface Promise<V> extends Future<V> {
// 标记该 future 成功及设置其执行结果,并且会通知所有的 listeners。
// 如果该操作失败,将抛出异常(失败指的是该 future 已经有了结果了,成功的结果,或者失败的结果)
Promise<V> setSuccess(V result);
// 和 setSuccess 方法一样,只不过如果失败,它不抛异常,返回 false
boolean trySuccess(V result);
// 标记该 future 失败,及其失败原因。
// 如果失败,将抛出异常(失败指的是已经有了结果了)
Promise<V> setFailure(Throwable cause);
// 标记该 future 失败,及其失败原因。
// 如果已经有结果,返回 false,不抛出异常
boolean tryFailure(Throwable cause);
// 标记该 future 不可以被取消
boolean setUncancellable();
// 这里和 ChannelFuture 一样,对这几个方法进行覆写,目的是为了返回 Promise 类型的实例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> sync() throws InterruptedException;
}
ChannelPromise
ChannelPromise 接口在 Netty 中使用得比较多,因为它综合了 ChannelFuture 和 Promise 两个接口
综合了 ChannelFuture 和 Promise 中的方法,只不过通过覆写将返回值都变为 ChannelPromise 了而已,没有增加什么新的功能
总结下各个接口都实现了什么逻辑
以下两步都是异步调用