目录
说在前面,此篇文正有点长,希望您耐心看完后,对您有所帮助。
一、线程池的概念
线程池(Thread Pool)是一种基于池化思想管理线程的工具。类似于我们连接数据库的连接池。
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
- 创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,创建成本很高
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
- 系统无法合理管理内部的资源分布,会降低系统的稳定性。
二、线程池的原理
如下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。
实现这样一个线程池,有几个问题需要考虑:
(1)队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
(2)线程池中的线程个数是固定的,还是动态变化的?
(3)每次提交新任务,是放入队列?还是开新线程?
(4)当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
带着这几个问题我们分析一下 Java 中 具体是如何实现线程池的。
三、Java 中的线程池
在正式讲解 Java 中的线程池之前,我们先看看线程池的继承体系。
在这里,有两个核心的类:ThreadPoolExector 和 ScheduledThreadPoolExecutor,后者不仅可以执行某个任务,还可以周期性地执行任务。
1、ThreadPoolExector
1.1、ThreadPoolExecutor 类中主要属性
// 状态及线程数变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任务的队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁,对线程池内部变量互斥访问
private final ReentrantLock mainLock = new ReentrantLock();
// 存放所有的工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
属性说明:
(1)、原子变量 ctl
原子变量 ctl 记录了线程数量(workerCount)和线程池状态(runState)。它是如何做到一个变量做两件事的呢?它是通过,如下所示,最高的3位存储线程池状态,其余29位存储线程个数。
状态说明:
RUNNING
:运行状态,接受新的任务并且处理队列中的任务。SHUTDOWN
:关闭状态(调用了shutdown方法)。不接受新任务,,但是要处理队列中的任务。STOP
:停止状态(调用了shutdownNow方法)。不接受新任务,也不处理队列中的任务,并且要中断正在处理的任务。TIDYING
:所有的任务都已终止了,workerCount为0,线程池进入该状态后会调 terminated() 方法进入TERMINATED 状态。TERMINATED
:终止状态,terminated() 方法调用结束后的状态。
状态之间的迁移过程,如下所示:
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。
(2)、任务队列 workerQueue
workQueue 是线程池中的一个BlockingQueue阻塞队列,线程池是以生产者-消费者模式,通过这个阻塞队列来实现任务的缓冲。当线程池中运行的线程数量等于 核心线程数量(corePoolSize)时,再提交的任务就会放入到此队列缓冲器中。使用不同的BlockingQueue可以实现不一样的任务存取缓冲策略。具体的实现如下所示:
(3)、工作线程集合 workers
线程池中把每个线程都封装成了一个 Woker,Worker是ThreadPoolExector的内部类,实现了 Runnable 接口,同时还继承了 AbstractQueuedSynchronizer。 其核心代码如下所示:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread; // worker 封装的线程
Runnable firstTask; // worker 接收的第一个任务
volatile long completedTasks; // worker 执行完的任务数
// 构造方法,回调用线程工厂方法创建线程
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 执行任务,调用的是ThreadPoolExecutor 中的 runWorker 方法
public void run() {
runWorker(this);
}
}
有源码可以发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?在接下来分析线程池的关闭、线程执行任务的过程时会了解到。
1.2、核心配置参数解释
ThreadPoolExecutor在其构造函数中提供了几个核心配置参数,来配置不同策略的线程池。了解了清楚每个参数的含义,也就明白了线程池的各种不同策略。如下图所示,ThreadPoolExecutor 提供了四个构造方法。
接下来重点讲解一下参数最多的构造方法,其他三个只是内部提供了一下默认值,最终还是调到了第三个方法。第四个构造方法具体源码如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
上面各个参数的解释如下:
- corePoolSize:核心线程数,也就是在线程池中始终维护的线程个数。
- maxPoolSize:线程池中的最大线程数,也就是在corePooSize已满、队列也满的情况下,扩充线程至此值。
- keepAliveTime / TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
- blockingQueue:线程池所用的队列类型,用来缓存线程
- threadFactory:线程创建工厂,可以自定义,也有一个默认的。
- RejectedExecutionHandler:corePoolSize 已满,队列已满,maxPoolSize 已满,最后的拒绝策略。默认提供了四个拒绝策略,也可以自定义策略。
这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下所示的处理流程:
线程池提供的四个默认策略:
1. ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException 来拒绝新任务的处理。
2. ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
3. ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
4. ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
1.3、任务提交流程分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当如果当前运行线程数小于核心线程数(corePoolSize)则创建新线程
if (workerCountOf(c) < corePoolSize) { //当如果当前运行线程数小于核心线程数则创建新线程
if (addWorker(command, true)) // 判断线程是否创建是否成功,如果成功直接返回
return;
c = ctl.get();
}
// 如果当前运行线程数大于等于核心线程数(corePoolSize) 则把线程加入缓冲队列
// 再加入缓冲队列时先判断线程是否处于运行状态。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次判断线程池是否在运行状态
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) //如果当前线程池中没有运行的线程则启动一个工作线程
addWorker(null, false);
}// 放入队列失败, 则开启新的线程。
else if (!addWorker(command, false))
reject(command);
}
此方法中主要有三个分支,分别是:
- 判断当前运行线程数是否小于 核心线程线程数(即:小于corePoolSize)。如果是则直接开启新的线程,并执行任务。
- 如果当前线程数不小于核心线程数,则尝试把任务放入缓冲队列中 workQueue.offer(command)。
- 如果放入缓冲队列已满,则尝试开启新的线程执行任务。
在 execute 方法中重要调用了 addWorker() 方法,那么我们在看看 此方法。源码如下:
// 此方法用于开启一个新的线程,第二个参数 core 为 true 则用 corePoolSize 作为上界,
// 如果为 false 则用 maximumPoolSize 作为上界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
// 如果状态 大于或等于 SHUTDOWN,说明线程池进入了关闭状态。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { // 通过“自旋 + CAS”操作使线程数加1
int wc = workerCountOf(c); // 获取当前线程池中运行的线程数
// 如果线程数超过了 corePoolSize 或 maximumPoolSize
// 不开启新线程,直接返回 false.
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过 CAS 操作使线程数加1, 如果成功跳出外层 for 循环
if (compareAndIncrementWorkerCount(c)) //线程数加 1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 如果再次过程中状态发生了变化则继续外层for 循环。
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 线程数成功加1 则创建新的线程执行任务
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//构造一个工作线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 把创建的新线程加入到线程集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { // 如果线程添加成功则启动线程
t.start(); // 启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果加入失败则调用下面方法,
addWorkerFailed(w); // 此方法会使线程数减1
}
return workerStarted;
}
方法解析,此方法虽然很长但业务逻辑不复杂。主要有以下两个部分。
第一部分:两个嵌套循环,外循环主要是用来判断线程池的状态, 内循环就通过 CAS 操作对运行线程数加一,如果成功加一则退出整个循环。
第二部分:如果第一部分对线程数成功加1,则创建新的线程,并把创建的线程加入到线程集合中。如果添加成功,则启动线程,否则对线程数减1。
具体流程如下所示:
1.4、任务的执行过程分析
上面的分析任务提交过程中,可知,可能会开启一个新的 Worker,并把任务本身作为 firstTask 赋给该 Worker。但对于一个 Worker 来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
下面来看 Woker 中 run()方法的实现过程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // 设置初始状态为 -1
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 调用 ThreadPoolExecutor 的 runWorker 方法
}
}
通过源码我们发现,Worker 中的 run() 方法很简单,主要是调用了 ThreadPoolExecutor 中的 runWorker 方法。那么我们在看看 runWorker() 方法的实现。如下:
// 核心方法,ThreadPoolExecutor 中的 runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 通过循环从队列中不停的获取任务。 注意: 此处的 getTask() 方法。
while (task != null || (task = getTask()) != null) {
w.lock(); // 关键点:在执行任务前要先加锁。这个加锁在关闭线程池时,会用到
//拿到任务后,先判断当前线程池的状态,如果发现已经开始关闭,则自己给自己发中断信号。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 任务之前的钩子方法,目前为空
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 执行完任务之后的方法,目前为空
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false; // 判断这个Worker是正常退出,还是受到中断退出,或者其他异常退出
} finally {
processWorkerExit(w, completedAbruptly); // Worker退出
}
}
这个方法也很简单,关键点在代码中都标注了,不再赘述。在上面方法中我们发现 在 while 循环条件中,回调用 getTask() 方法,此方法就是调用阻塞队列获取任务,如果任务队列为空则阻塞在此方法上。代码详解如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 关键点1:
// 1、如果 rs >= STOP 即调用了 shutdownNow() 此处返回null 。
// 2、如果 rs >= SHUTDOWN 即调用了 shutdown(), 并且队列为空,此处返回null 。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; // 此处返回null,则退出上面的 while 循环。线程死亡。
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//关键点2:
// 1、队列为空,则会阻塞此处的 poll() 或 take() 方法,前者带超时,后者不带。
// 2、一旦收到中断信号, 此处就会抛出中断异常。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
此方法中有两个关键点:
关键点1:
1、如果 rs >= STOP 即调用了 shutdownNow() 此处返回null 。
2、如果 rs >= SHUTDOWN 即调用了 shutdown(), 并且队列为空,此处返回null 。关键点2:
1、队列为空,则会阻塞此处的 poll() 或 take() 方法,前者带超时,后者不带。
2、一旦收到中断信号, 此处就会抛出中断异常。关键点2和后面要介绍的 线程池关闭有很大关联。
1.5、线程的关闭
在本章开头讲解 “原子变量 ctl ” 时我们讲解了线程池的存在多种状态,由此可知线程池的关闭是需要一个过程的。也就是 在调用 shutdown() 或者 shutdownNow() 之后,线程池并不会立即关闭。接下来我们就分析一下线程的关闭步骤。
(1)、shutdown 和 shutdownNow 的区别
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查是否有关闭线程池的权限
advanceRunState(SHUTDOWN); // 把线程池的状态值为 SHUTDOWN
interruptIdleWorkers(); // 中断空闲线程
onShutdown(); // 钩子方法 是空的
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 把线程池设置为 STOP 状态
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); //清空队列
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
有源码可知两者存在如下三点区别:
- shutdown() 把状态置为 SHUTDOWN,而 shutdownNow 把状态置为 STOP
- shutdown() 不会清空任务队列,会等所有任务执行完成,shutdownNow 会清空任务队列。
- shutdown() 只会中断空闲的线程,shutdownNow 会中断所有线程。
(2)、值中断空闲线程和 中断所有线程的区别
// 中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 关键点:tryLock 成功说明线程空闲,
// 不成功说明当前线程持有锁,正在执行任务
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
// 中断所有线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted(); // 不管线程有没有执行任务都发出中断
} finally {
mainLock.unlock();
}
}
此处关闭空闲线程的关键点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock() 如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
我们在 1.4 节时,提到过在 runWorker() 方法中 worker 执行任务时 回调用 w.lock() 方法加锁,在此处关闭线城市就用到了。这也是 Worker 类为什么要基础 AQS 的原因所在。
(3)、tryTerminate() 方法
在shutdown() 和 shutdownNow() 方法最后都调用了 tryTerminate() 方法,我们在分析下这个方法的功能。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//线程状态正在运行 或者 正在 TIDYING(整理状态),
// 或者处于 SHUTDOWN 状态且还任务队列不为空。则直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果当前工作线程不会空,这再次关闭空闲线程,直接返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 把线程池设置为 整理(TIDYING) 状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 把状态设置为 TERMINATED状态
termination.signalAll(); // 通知
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
此方法的功能就是通过 自旋 + CAS 的方式更改线程池的状态。
1.6、线程池关闭和任务执行综合分析
(1)、shutdown() 与 任务执行综合分析
把线程池的关闭过程和1.4介绍的任务执行过程结合起来进行分析,当调用 shutdown() 的时候,可能出现以下几种场景:
场景1:当调用 shutdown() 的时候,所有线程都处于空闲状态。这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask() 函数的地方。然后,所有线程都会收到interruptIdleWorkers() 发来的中断信号,getTask() 返回null,所有Worker都会退出while循环,之后执行processWorkerExit。
场景2:当调用 shutdown() 的时候,所有线程都处于忙碌状态。此时,队列可能是空的,也可能是非空的。interruptIdleWorkers() 内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回 null。之后,就和场景1一样了,退出while循环。
场景3:当调用shutdown()的时候,部分线程忙碌,部分线程空闲。有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask() 函数的地方。空闲的这些线程会和 场景1 一样处理,不空闲的线程会和 场景2 一样处理。
(2)、shutdownNow 与 任务执行综合分析
和上面的 shutdown() 类似,只是多了一个环节,即清空任务队列。在第1章中已经讲到,如果一个线程正在执行某个业务代码,即使向它发送中断信号,也没有用,只能等它把代码执行完成。从这个意义上讲,中断空闲线程和中断所有线程的区别并不是很大,除非线程当前刚好阻塞在某个地方。
1.7、线程清理工作
当Worker退出执行后,回调用 processWorkerExit() 方法做一些清理工作。此方法的具体工作如下所示:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 判断线程是否为非正常退出,如果是非正常退着把运行线程数量减一。
// 比如运行的任务出现异常等就会导致非正常退出。
if (completedAbruptly) //
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 从工作线程集合中移除 worker自己
} finally {
mainLock.unlock();
}
// 每个线程结束时,都会尝试看是否可以终止整个线程池
tryTerminate();
// 关键点:当线程退出时,如果线程池状态小于 STOP 并且任务队列不为空,
// 并且当前没有了工作线程数了,那么会调用 addWorker 方法再创建一个新线程,把
// 任务队列中的任务消耗完。
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
四、总结
这篇文章有点长,主要是介绍了,构造方法中的参数,线程池的工作原理就是通过 生产者-消费者模式实现,任务的提交及执行流程和线程池的关闭流程。以及 shutdown() 和 shutdownNow() 两个关闭方法的区别。
参考:
https://blog.csdn.net/weixin_38087538/article/details/107170359
《Java并发实现原理:JDK源码剖析》— 余春龙