目录
概述
线程是为了更充分合理地利用计算机各种系统资源,但是线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等私有空间,线程销毁时还要进行回收,在高并发条件下,频繁地进行线程创建、销毁操作,会造成系统资源的大量浪费,增加并发风险。另外,并发量过大时,需要让新线程等待或拒绝服务,这些仅靠线程自身无法解决,因此需要线程池进行协调。
线程池的主要任务有:
- 线程池可以管理、复用线程,控制最大并发数
- 实现任务线程队列缓存策略和拒绝机制
- 实现任务的定时执行、周期执行
- 隔离线程环境,避免线程互相影响
ThreadPoolExecutor是Java中比较常用的线程池类
相关类图如下
ExecutorService继承了Executor接口,定义了管理线程任务的方法,其抽象实现AbstractExecutorService提供了部分实现,但是核心方法execute因为不同实现执行策略不同,并没有给出默认实现。
Executors的静态工厂方法,可以产生其三个子类:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor:
- newWorkStealingPool:创建一个足够大的线程池保持并行度(默认是CPU数量),通过使用多个队列减少竞争,通过ForkJoinPool实现
- newCachedThreadPool :创建一个高度可伸缩的线程池,最多能有Integer.MAX_VALUE个线程,此时肯定已经OOM,基于ThreadPoolExecutor实现
- newScheduledThreadPool:同上,但是支持定时任务和周期任务,也不会回收线程。使用ScheduledThreadPoolExecutor实现
- newSingleThreadScheduledExecutor:只有一个线程的线程池,基于ScheduledThreadPoolExecutor
- newFixedThreadPool:固定大小的线程池。基于ThreadPoolExecutor实现。
我使用的是JDK8,代码和书上的有点不一样,但是区别很小,设计思路和工作流程都没有变化
关键变量
// 线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 用低29位表示线程数,高3位表达状态
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 以下五个变量定义了运行状态,存储在高3位
// RUNNING:接受新任务并处理排队的任务
// SHUTDOWN:不接受新任务,但处理排队的任务
// STOP:不接受新任务、不处理排队的任务和中断正在进行的任务
// TIDYING:所有任务都已终止,线程数已经为零,线程切换到TIDYING状态并执行terminated方法
// TERMINATED:terminated方法执行完毕
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这里的设计很巧妙
首先是将整型数字高3位用来表示线程状态,可以看到,RUNNING状态是最小的,仅次于SHUTDOWN。因此判断线程是否在运行直接靠数字比较即可:
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
线程状态间的比较也被转化成数值比较。
其次,将低29位用来表示线程池容量,表明最大线程数是
精妙之处在于,用一个int值,就表达了两个意思,可以通过位运算分别查询运行状态和线程数:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
根据注释,线程池有以下状态转换:(自制,丑)
看到AtomicInteger,就能想到是用CAS来更新计数器的:
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
还有些变量会在构造方法中提到
构造方法
先贴源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面进行解释:
- corePoolSize:常驻核心线程数,如果为0,则没有请求进入时,会销毁线程,大于0则不进行销毁
- maximumPoolSize:能容纳同时执行的最大线程数,至少为1,如果和corePoolSize相同,就是个固定大小线程池
- keepAliveTime:空闲线程存活时间
- unit:这个不用解释了,时间单位
- workQueue:任务/线程队列,请求数超过maximumPoolSize时,就可以让多余的请求等待
- 如果是通过Executors.newFixedThreadPool创建,则使用LinkedBlockingQueue实现
- threadFactory:线程工厂,用来生产线程,线程池的命名也是通过工厂增加组名前缀来实现的
- 默认为Executors.defaultThreadFactory()方法提供
- handler:执行拒绝策略的执行器,超过workQueue默认是中止策略(AbortPolicy)
- 内置的策略有:
- AbortPolicy:抛出一个RejectedExecutionException异常
- CallerRunsPolicy:让产生任务的线程自己去处理,这样就能减缓新任务产生的速度
- DiscardPolicy:处理不了的任务直接丢弃(不抛出异常)
- DiscardOldestPolicy:从队列头部丢弃任务,让新任务进入队列等待
- 建议的友好策略有:
- 先持久化多余的任务,空闲时再提取出来执行
- 转向提示页面
- 打印日志
- 内置的策略有:
execute方法
还是先贴源码:
public void execute(Runnable command) {
// 空检查
if (command == null)
throw new NullPointerException();
// 返回整数,包含了当前线程数和线程池状态
int c = ctl.get();
// 如果工作线程不足,则新建线程去执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果创建失败,在此期间线程池信息可能发生了变化,要重新获取
c = ctl.get();
}
// 如果线程池正在运行,就添加一个任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果这时候线程池突然关了,就把刚刚添加的任务移除并拒绝服务
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则再次检查工作线程数,不足就添加
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 线程池没有运行,或者队列已满,则尝试创建一个新线程,失败了就拒绝服务
else if (!addWorker(command, false))
reject(command);
}
能够执行任务的条件是:1)线程池正在运行 2)有空闲线程可以来执行任务 3)任务队列还没满
可以看到,addWorker是一个关键方法
addWorker方法
主要功能是:根据当前线程池状态,检查是否可以添加新的任务线程,是则创建并启动任务,并返回true
失败的可能原因:线程池没有在运行,或者创建线程池遇到了异常
其两个参数意义如下:
- firstTask:外部启动线程池时构造的第一个线程
- core:如果是true,新增线程时需要判断活跃工作线程是否少于corePoolSize;否则检查活跃工作线程是否少于maximumPoolSize。
private boolean addWorker(Runnable firstTask, boolean core) {
//用于快速跳出循环
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 进行线程池状态和任务队列的检查
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 线程池不能超过容量上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 活动线程数+1,如果加成功了,则退出这段循环
// 否则重新读取ctl值并重新进入循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 进行加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 再次检查线程池状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// largeestPoolSize是最大并发数
if (s > largestPoolSize)
largestPoolSize = s;
// 设置flag
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 线程创建成功,开始执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
流程不复杂,主要工作就是:状态检查、线程创建(Worker)、执行任务(Thread),最下面finally块执行的addWorkerFailed很简单,就是如果线程启动失败,则把ctl低29位的值减一
中间创建线程和添加线程到队列的时候,使用了一个可重入锁,这个锁在worker整个生命周期中都能看到它的存在,只有加锁成功,才有权力创建、销毁线程
执行的任务是通过Worker产生的,这是一个内部类
Worker内部类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// -1代表该线程在runWorker执行前不会被中断
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// runWorker是ThreadPoolExecutor的一个final方法,主要是提取firstTask然后运行
public void run() {
runWorker(this);
}
……
}
runWorker为开发者提供了一个自定义的机会,在真正开始执行任务前,有一个beforeExecute方法,默认为空实现,源码的注释里也明确写了可以在子类提供实现。
总结
使用线程池需要注意:
- 根据实际业务场景合理设置参数,例如工作线程,过大容易OOM,过小则起不到节约资源的作用
- 线程必须通过线程池创建
- 为线程和线程池创建有意义的名称(例如自定义ThreadFactory,重写线程命名方法)
不要使用Executors静态方法创建线程池,而要使用new ThreadPoolExecutor的方式创建,以便合理设置参数