CountDownLatch在NioEndpint中的应用

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wojiushiwo945you/article/details/78936102

背景

Tomcat源码中多处用了java.util.concurrent包中的类,用以处理多线程环境下的流程控制。近日分析了下NioEndpoint源码,本文将以此类为背景,膜拜下Java大神们使用CountDownLatch并发控制的手法,其实也就是简单的实际应用,算不上高深。

类图框架

NIO tailored thread pool, providing the following services:
Socket acceptor thread:Acceptor
Socket poller thread:Poller
Worker threads pool:Executor

以上是该类的注释,结合源码我们知道NioEndpoint就是一个定制线程池,管理了三种线程:Acceptor、Poller、Worker。
(百来的一张很清晰的结构图如下:)
这里写图片描述

初始化

NioEndpoint类维护了一个stopLatch的变量,其类型就是CountDownLatch。它根据Poller线程的个数进行初始化的,源码如下:

public void bind() throws Exception {
    ....
    if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        stopLatch = new CountDownLatch(pollerThreadCount);
    ....
}

NioEndpont类初始化时指定了Poller和Accetpor线程的个数,而且从上面代码的注释信息来看,acceptorThreadCount的数值是固定的1,即Tomcat的NIO并不支持多个Accepor线程,此外也没有可以修改该属性的途径。

stopLatch控制流程

stopLatch,顾名思义,是控制Tomcat的组件停止时使用的锁,利用CountDownLatch的基本用法,主线程等待一组线程到达某个状态后,才进行后面的处理。NioEndpoint的stopInternal()方法的流程如下:

public void stopInternal() {
        releaseConnectionLatch();
        if (!paused) {
            pause();
        }
        if (running) {
            running = false;
            unlockAccept();
            for (int i=0; pollers!=null && i<pollers.length; i++) {
                if (pollers[i]==null) continue;
                pollers[i].destroy();
                pollers[i] = null;
            }
            try {
                stopLatch.await(selectorTimeout + 100, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ignore) {
            }
            shutdownExecutor();
            eventCache.clear();
            nioChannels.clear();
            processorCache.clear();
        }
 }

该方法将导致所有的处理线程都停止工作,其流程为:
首先,通知Poller线程停止工作,调用其destroy,设置Poller的close标识为true。
其次,设置running为false,通知Accepotr线程终止run方法的循环处理。
第三,当前线程stopLatch.await,等待所有的Poller线程的run方法结束。Poller的run方法最后一句是stopLatch.countDown();当stopInternal的await方法被唤醒时,说明所有的Poller线程都结束了。
第四,此处调用await操作的超时时间设置为selectorTimeout,这个值也是Poller处理时的阻塞时间,也就是说:如果Poller的在轮询过程中调用了selector.select(selectorTimeout);的话,最多等待这么长时间,就能保证所有的Poller都及时结束了。

此处,之所以不用考虑Acceptor的结束问题,是因为Acceptor线程只有一个,而且它没有阻塞处理,所以一旦running标识为false,它就会立即结束的。

所有的处理线程都结束之后,shutdownExecutor()操作会关闭工作线程池的调度器,至此,所有的线程都被关闭了。

猜你喜欢

转载自blog.csdn.net/wojiushiwo945you/article/details/78936102