ThreadPoolExecutor为AbstractExecutorService的实现子类。该线程池以内部线程池的形式提供管理任务执行,线程调度,线程池管理等待。
实现过程流程图如下:
【问】为何要使用线程池?
【答】
- 在程序运行的过程中,线程的创建/销毁会伴随着系统资源的开销。而在一个多线程高并发的应用环境下,过于频繁创建和销毁会对系统的响应时间造成影响。而线程池可以指定最大和最小的线程容量,从而避免过多的线程对资源过度消耗。
- 线程池管理线程创建和销毁的整个生命周期。在线程池中提前创建好指定数量的线程,则我们可以从线程池中直接获取线程,线程用完后重写放回池中,使得线程池中的线程被重复利用。减少了由于创建/销毁线程应用程序所需等待的时间。
- 利用线程池可以对线程进行统一的管理,分配,优化和监控。
【构造方法】成员变量
- corePoolSize:核心线程池大小
- maxmaximumPoolSize:线程池最大容量
- keepAliveTime:线程最大空闲时间
- workQueue:线程存放队列
- threadFactory:线程工厂
- rejectedExecutionHandler:拒绝策略
【核心方法execute】
void execute(Runnable comman)
execute方法大体分为三步:
(1)当workCount小于corePoolSize时,增加线程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//增加线程
return;
c = ctl.get();
}
(2)如若workCount大于corePoolSize,则判断线程池是否处于RUNNING状态,且队列是否已满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判断线程池状态,防止状态突变
if (! isRunning(recheck) && remove(command))//如若线程池状态不为RUNNING,移出刚才加入的task
reject(command);//执行拒绝策略
else if (workerCountOf(recheck) == 0)//如若当前worker数量变为0
addWorker(null, false);//添加一个空线程,以免offer进的task没有线程执行。
}
(3)如若我们不能继续往队列中存入task,则直接新增worker,如若失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
我们从上述代码中可以看出execute中关键的是addWorker方法。
【addWorker方法】
了解addWorker方法之前,先说明几个变量
- ctl:AtomicInteger类型,包含两个field:前3位runState(线程池状态)+后29位workCount(线程池中的线程数量)
- 线程池五种状态:RUNNING(111)、SHUTDOWN(000)、STOP(001)、TIDYING(010)、TERMINATED(110)
- RUNNING:线程池正在运行,可接受new task和执行队列中的任务
- SHUTDOWN:不可接受new task,但是可继续执行队列中的task
- STOP:即不能接受new task,也不能继续执行队列中的task
- TIDYING:所有任务结束,工作技术为零。线程池进行一些清理
- TERMINATED:terminated方法完成
- 状态转换:RUNNING--->SHUTDOWN:shutdown方法;RUNNING/SHUTDOWN---->shutdownNow方法; SHUTDOWN--->TIDYING:队列和pool均为空;STOP----->TIDYING:pool为空时;TIDYING--->TERMINATED:terminated方法完成
- 注:从shutdown到tidying的过程比较困难。由于在shutdown之后,原本queue中可能不能为空,故shutdown之后可能仍会有worker在执行。所以这里讲shutdown到tidying的状态转换用workCount变量判断。当workCount为零之后,变为TIDYING
接下来进入addWorker方法:
可以看出worker通过start方法启动线程。worker实则Runnable。
其构造方法为
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看出其调用的是其自身的run方法,run方法调用runWorker
在runWorker方法中,首先会判断task是否为空,如若为空去队列中取task
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();//take阻塞线程,直至取出一个为止
if (r != null)
return r;
取出线程后调用task的run方法,随后会置空task。
接下来看shutdown和shutdownNow方法。
在上部分LZ提到了线程池的五种状态。而这两个方法便是进行其中某两个状态的转变的方法。
【shutdown】:shutdown--->tidying,中断空闲线程。
【问】何为空闲线程?
【答】在shutdown的方法中调用的interruptIdelWorkers方法可以看出,
循环workers,当worker可以获取锁时,即为空闲线程。从这里看出,阻塞在runWorker方法的getTask方法中的线程即为空闲线程。
【shutdownNow】:shutdown--->stop
这个方法会将线程状态设为STOP,随后HashSet中的worker全部中断,将pool和queue置空。这个方法会返回尚未执行的线程。
【不可转变为TIDYING的情况】
- 状态为RUNNING的情况
- SHUTDOWN,但是queue不为空
- 本身为TIDYING或者STOP
综上,ThreadPoolExecutor便简单了分析完了。准备实现一个自身的线程池