我先列举几个线程池有关的题目,读者可以做个自我检测:
- 线程池初始化时,需要设置哪些参数
- 线程池的默认线程工厂,创建线程是如何命名的
- 线程池的拒绝策略,JDK 提供了哪些
- 线程池的核心线程是否会死亡
- 提交(submit)任务时有什么返回值
- Future 是怎么 get 出线程执行结果的
- 线程池如何记录当前状态,以及工作线程数量
- Executors 构建的线程池有哪些问题
- 封装线程的类(Worker)有什么特点
- 工作线程如何执行任务
- 线程到指定时间后是如何死亡的
- 线程池是如何终止的
线程池是一个我们 Java 程序员经常接触的一个高效并且优秀的并发工具类。但是实际上在使用的过程,很多程序员会忽略一些细小的地方,导致程序的执行不够友好。
所以,要真正达到一个优秀的开发程序员,我们需要去对线程池的大体和部分细节,做一个深入的了解和研究,这样,在我们编码过程中,才能够游刃有余。
学习忌浮躁
在我这不要怕源码,毕竟 AQS 我都带你看过来了,线程池就是小意思。
而且我的注释和分析已经不能再详细了。。
文章目录
线程池理念
首先有几个接口要了解:
第一个是 Executor,第二是 ExecutorService,再后面才是线程池的一个使用 ThreadPoolExecutor。
Executor:执行者
所以它有一个方法叫 execute:执行。而执行的任务就叫 Runnable。
它是一个接口,所以可以有很多实现,不像我们之前,我们要执行一个 Runnable 任务的时候,必须手动 new 一个线程,然后调用它的 start 方法来启动执行。
而通过这个 executor 接口,我们可以将任务和执行从此划分开来。以前,我们 new Thread,通过 start 执行 run 方法是写死固定的,但是通过这个接口,则可以有各种实现。
我们只需要去关心创建任务,然后 提交执行,而不必去关心具体的执行过程。
public interface Executor {
void execute(Runnable command);
}
ExecutorService 继承于 Executor
它不仅拥有 execute 执行任务,还包括了一整套的任务执行的生命周期。
其中,主要还增多了 任务提交(submit),不仅可以提交 Runnable,还可以提交 Callable。
提交意味着,会有任务执行的结果,所以提交之后会返回一个对象,叫将来,意味着在将来任务执行完毕后,你可以获取到这个执行结果。
所以,这里又涉及到了 Future、FutureTask 等等的类
public interface ExecutorService extends Executor {
void shutdown(); // 结束
List<Runnable> shutdownNow(); // 立即结束
boolean isShutdown(); // 是否结束
boolean isTerminated(); // 是否完全终止
// 等着结束,要是时间到了还没结束就返回 false
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
}
Callable 与 Runnable 类似,它们被设计出来都是想让一个线程去运行它。
那有了 Runnable 为什么还要有 Callable,原因很简单,我们看了它的接口设计就会发现,Callable 的方法有一个可以指定类型的返回值。也就是说,Callable 执行的任务可以返回一个结果。比如说我们需要一个异步计算,就必须把这个计算的结果返还给我们,而用 Callable 就可以很容易地做到。
public interface Callable<V> {
V call() throws Exception;
}
而 Callable 的结果应该怎么返回给我们呢,毕竟线程的运行结束时间是不受我们控制的。
所以我们又想到了一个接口:Future
这个 Future 则意味着,在将来的某个时间,如果 Callable 的方法执行结束了,那么返回的值就会保存在这个 Future 里,等到我们需要时,就可以从这个 Future 中取到。
我们在用 submit 提交任务的时候,就会返回一个 Future,来保存我们的任务执行的结果。
线程池基础
JDK 默认提供了一些线程池的实现,它们都是 ExecutorService 的实现类。
Executors 则是对线程执行的工具类,可以看做是线程池的工厂,是用来生产各种各样的线程池的。
首先线程池有 7 个参数大家必须掌握:
- corePollSize:核心线程数
- maximumPS:最大线程数
- keepAliveTime:生存时间(线程多久不执行任务就死亡)
- TimeUnit:生存时间的单位
- BlockingQueue:任务阻塞队列
- ThreadFactory:线程工厂
- RejectStrategy:拒绝策略
再来看线程池的任务执行过程:
- 在一个线程池中,在有任务到来时,如果核心线程数没有达到,就会新建线程来执行;
- 等到核心线程数满了,那么多余的任务就会放入阻塞队列;
- 如果阻塞队列满了,那么再多余的任务,就会创建非核心线程来执行;
- 如果一段时间没有了任务,那么在指定时间过后,非核心线程就会退休死亡;
(线程池也可以设置核心线程超时死亡) - 如果最大线程数也到达了,那么再提交任务,除非队列还有空位,否则就将执行拒绝策略。
- (其中,线程的创建都是由线程工厂创建的。)
JDK 默认提供了一些拒绝策略,常见的 4 个:
- Abort:抛异常
- Discard:(悄悄地)丢弃任务,不抛异常
- DiscardOldest:扔掉排队时间最久的
(比如时时更新服务,当最新的一条更新来时,最老的更新实际就是没有意义的) - CallerRuns:让提交任务的线程自己处理该任务
常用变量
我们主要关注的线程池的内部原理,首先就是一个 ctl(AtomicInteger)
我们平时运用线程池的时候,都知道线程池需要记录当前的状态,是 Running、ShutDown… 等等,
还要记录当前线程数,有多少工作线程啊,要是达到上限了,存入队列等等。
所以我们需要保存两个状态。
但是,对于两个数值的操作,如果不加锁,将无法是原子性的。
因此,线程池内部采用了一个数分别两部分表示两个数:
前 3 位表示线程池状态,后 29位 表示 worker 线程数 !!!
然后通过 CAS,来保证操作的原子性。
(这一点,我们之前在看 AQS 读写锁源码的时候,发现也是这样一个技巧)
我们在分别需要两个数表示的值时,则用 位运算,取出对应的值。
我对源码做了详细的注释(如果你对原子类、位运算还不了解,那就得抓紧补补基础了)
// ctl 一个 AtomicInteger 通过Atomic原子类可以保证线程安全
// 这个ctl数值 分为两部分(一共 32位,前3位、后29位)
// 前3位用来表示线程池的状态,后29位表示线程池中线程(Worker数量)
// (ReentrantLock也是对一个整数,分成两部分分别表示)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.Size=32, 32-3 就是29,用来分割整数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 因为只有29位表示worker数量,所以最大worker数就是 2^29-1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 线程池的5种状态
// 因为是前3位表示,所以要左移29位,移到最前面
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池状态,通过按位与操作,将低29位全部变成0
private static int runStateOf(int c) { return c & ~COUNT_MASK; }‘
// 获取worker线程数,通过按位与操作,将前3位全部变成0
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// 根据线程池状态和worker数量,生成ctl值
// ctl 就是上面的 那个 AtomicInteger
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判断线程状态是否小于 xxx某一个状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 判断线程状态是否大于 xxx某一个状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 是否是 Running 状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// CAS 增加 worker 数量
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// CAS 减少 worker 数量
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
submit 提交方法
看源码发现,submit 实际上仍然是执行一个 Runnable 任务
它仅仅只是将提交的任务用一个 RunnableFuture 进行一个封装,然后就可以用 execute 方法,将任务交由线程池执行管理。
然后,就可以将其作为一个 Future 返回给提交任务者
所以,提交任务的方法,我们要主要研究 execute
public <T> Future<T> submit(Callable<T> task) {
// 不允许提交空任务,否则抛出异常
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 提交 Runnable 任务本质上是一样的
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
而我们的 RunnableFuture,实际上就是一个 FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// 直接返回一个 FutureTask
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutrueTask 获取线程池执行完的结果
这里不可能去很详细地去描述完整的 FutureTask 的源码,我们要抓重点,只关心与线程池有关的:
FutureTask 如何 get 出线程池执行完的结果。
我们看一下类的继承关系:
FutureTask 实现了 RunnableFuture 接口
而 RunnableFuture 实际上就是一个 Runnable 和 一个 Future
也就是说,FutureTask 既是一个任务(Runnable),可以被线程执行;
又是一个任务结果的包装(Future),可以在将来取出任务的结果
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
理解了 FutureTask 的概念,我们很容易就能看懂源码中的部分实现
在这个构造方法中,我们就可以发现,实际上就是做了一个赋值:
将任务保存到了 FutureTask 中,
然后把状态设置为 NEW,表示新建,可以被执行。
public FutureTask(Callable<V> callable) {
// 空指针判断
if (callable == null)
throw new NullPointerException();
// 保存任务
this.callable = callable;
this.state = NEW; // 设置新建(NEW)状态
}
开头是状态检查,过滤掉一些其他(不该执行的)状态
重点 try 中的方法:
1、正常执行的话,ran 就是 true,就会 将结果保存
2、如果出现异常,ran 就是 false,就不会保存结果,就是 null;不过会保存 发生的异常
所以 FutureTask 就是将结果保存,以后就可以从中将结果取出
public void run() {
// 状态检查(不必细究)
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
// 执行过程:重点
try {
// 用局部变量承装任务
Callable<V> c = callable;
// 执行前检查
// 检查是否为空,状态是否为NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 正常执行
// result 存放执行结果
// ran设置为true(表示正常执行)
result = c.call();
ran = true;
} catch (Throwable ex) {
// 如果异常
// 结果设置为 null
result = null;
// ran设置false(表示异常)
ran = false;
// 将异常保存起来
setException(ex);
}
// 如果成功执行了,就把结果存起来
if (ran)
set(result);
}
} finally {
// ...省略无关代码
}
}
execute 执行方法
既然线程池的超级父接口是我们的 Executor,那我们必然要好好看一下它的实现。
关键有 3 步
- 首先是判断核心线程数,没有满,去创建我们的核心线程,执行任务
- 如果在创建核心线程前,满了,那么就会去下一步
判断是否还在 Running,然后尝试入队
(这里有个小细节)
1、如果入队了,但是线程池关了,那这个任务放进去还有什么用???
所以它会重新再判断一遍,如果线程池关了,就把任务再移除掉
2、如果核心线程为 0(为什么要判断这个??????)
那么把它放到队列里,就会 一直存着,但是没有线程去执行 !!!
所以,如果是这样,说明是一个缓存线程池,就要创建非核心线程去队列里取任务执行 - 如果线程池关了,或者队列满了,就会进到下一个方法
如果是队列满了,那么就会尝试去创建非核心线程执行任务
要是非核心也满了,或者线程池关了,那只好执行 拒绝策略 了
放心,我的注释很详细:
public void execute(Runnable command) {
// 这个没啥好说的,条件判断,空任务抛异常
if (command == null)
throw new NullPointerException();
// 我们先把 ctl 的 int值 取出来
int c = ctl.get();
// 然后位运算取出后29位,和核心线程数比较
// 如果没有达到核心线程数,那我们就知道应该创建新线程去执行
if (workerCountOf(c) < corePoolSize) {
// 所以进入添加新线程执行的方法
if (addWorker(command, true))
return;
// 但是可能存在很多线程同时提交了任务去给线程池执行
// 所以在核心线程数不够的情况下,就有一部分任务会创建新线程失败
// 所以就得开始往队列里去了
// (添加失败一定是因为被其他线程抢先了,所以此时的ctl一定也被修改)
// (所以重新获取一次)
c = ctl.get();
}
/**
* 上面我们经过了分析
* 一种是核心线满了,来到这里
* 还有一种是本来没满,但是创建的时候被其他线程抢先了,然后又满了
* 所以本质上都是核心线程满了,来到这里
*/
// 这时候要进入队列
// 首先看一下线程池是不是正 running
// 然后就尝试入队
// 如果这两步有一步失败了,就去 else
if (isRunning(c) && workQueue.offer(command)) {
// 按道理,放进队列就该完事了
// 但是又拿 ctl 来重新检查一下
// 这么做是为什么?????????????????
int recheck = ctl.get();
// 还是比较容易理解的,因为如果入队后,线程池已经被关了
// 那么那个任务就无法被执行,所以再把它重新移除掉
if (! isRunning(recheck) && remove(command))
reject(command);
// 为什么要判断一下是不是 0 ???????
// 因为核心线程数允许为 0,那么 !!!
// 这就是一个 缓存线程池
// 所以立即创建一个线程,去队列里拿任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 上面说了,有两种情况
* 一是线程池不在 Running 状态了(说明已经结束提交任务了)
* 还有就是入队失败,那就是队列满了
*/
// 这时候我们就进行一次非核心线程的创建执行任务
//(等等,队列满了还好说,为啥不是 running 了还能提交?????)
// 其实不是 running状态 一定是会失败的,它只是把
// 判断的过程放在了 addworker 方法中
else if (!addWorker(command, false))
// 不是 running、或者连非核心线程也满了
// 那最后我们就得执行我们的拒绝策略
reject(command);
}
addWorker 创建线程
注:我的源码是 11 版本的,你看 1.8 可能发现有一点小不一样,但实际上过程都是一致的,只是代码的表现形式,稍有不同。
里面的逻辑有些复杂,直接看的话会有点麻烦,我先梳理一下流程
- 首先有两层循环
- 1、第一层用于判断线程池状态是否符合要求
- 2、第二层循环是 CAS 自旋,将线程数 +1
、如果线程池的状态不符合要求,那么就会 return false 返回并结束方法
、如果线程池一直在运行,那么就会进内层循环去增加 worker 数量
、、如果线程数量没有满,那么就不断 CAS 去增加线程数。
、、如果线程数量满了,那么就返回 false
上面的循环如果成功增加了线程数,那么才会来到后面,去创建新的 worker 线程,去执行任务
- 先创建线程
- 把线程加入集合中
- 更新历史最大线程数
- 启动线程
当然其中还有很多细节,要穿断是否线程池被关了(到处都在判断)
还要判断线程是否被启动了
如果添加失败了,说明线程池被关了。。。那么又要后续操作
private boolean addWorker(Runnable firstTask, boolean core) {
// 基础不好的可能都不知道这是啥。。。。
// 其实呢,是后面有两层循环,在最外层放一个 retty 作为标志
// 就能够通过它直接 break 跳出两层循环
retry:
// 外层死循环
// 获取 ctl值
for (int c = ctl.get();;) {
// 首先判断 线程池状态 是否 >= ShutDown
// running是可提交,>= shutdown 那就不可提交了
// 所以只要是 running,这里就会是 false,因为跟着 && 运算
// 后面的整个括号的内容就不用看了,直接到后面开始循环
if (runStateAtLeast(c, SHUTDOWN)
// 如果状态已经不是 running 了
// 这里有个大括号一直到后面,
// 如果 >= STOP,说明任务都不允许再执行了
&& (runStateAtLeast(c, STOP)
// 或者传入的任务是空(说明是一个缓存池)
|| firstTask != null
// 或者队列中已经空了
|| workQueue.isEmpty()))
return false;
// 内层死循环
// 这一层循环是作为 CAS 的自旋使用的
// 可能增加线程操作会被多线程竞争,一次添加失败,就自旋一次
for (;;) {
// 首先判断,线程数有没有到上限
// 这里用了三元运算符,如果添加的是核心线程,就和最大核心线程数比
// 如果不是,那就和 最大的线程数比较
// (小细节,它最后和COUT_MASK做了一个与运算,保证前3位都是0)
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 如果到上限了,就直接返回false,无法再添加执行线程了
return false;
// 上面的判断都通过,此时可以 CAS 添加新线程执行任务
// 如果 CAS 成功了,那就跳出循环
// (这里是跳出两层循环)
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取 ctl,判断线程池状态
// (因为线程池可以被关闭,所以我们发现在源码中总是疯狂判断)
c = ctl.get();
// 这时如果发现大于等于 SHUTDOWN 了
// 那就不是 Running 了,退回外层循环
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
// 能来到这里,说明刚才的 CAS 自旋已经成功了
// 给出两个初始变量,如果线程成功启动了,就把workerStarted 改为true
boolean workerStarted = false;
// 如果线程成功添加了,就把workerAdded 改为true
boolean workerAdded = false;
Worker w = null;
try {
// 这时候便开始创建工作线程worker
// 把提交的任务给worker第一个执行
w = new Worker(firstTask);
// 获取worker对应的那个线程
final Thread t = w.thread;
if (t != null) {
// 这里要加锁(ReentrantLock还不懂的去看我的博客)
// 停止线程池的时候,也会加锁
// 这样,就可以避免线程安全的问题
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取 ctl
// 紧接着又是一堆线程池状态判断
int c = ctl.get();
// 只要是 running 就进 if
if (isRunning(c) ||
// 如果线程池被停了。。。
// 看看是不是 ShutDown或者Stop,
// 并且任务为空(说明是缓存池)
(runStateLessThan(c, STOP) && firstTask == null)) {
// 这一步按道理不会发生
// 判断是不是线程已经启动了(因为还没start它呢,
// 它要是自己启动了,那肯定得抛异常)
if (t.isAlive())
throw new IllegalThreadStateException();
// 把worker放入集合中管理
workers.add(w);
// 然后更新历史上出现过的最大worker数
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 加入成功,那么然后就会去启动它了
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// worker被成功加入集合中管理,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果worker没有启动,那么说明线程池状态发生了变化(被关了)
// 那么就需要进行其他相关操作
if (! workerStarted)
addWorkerFailed(w);
}
// 返回结果
return workerStarted;
}
Worker 工作线程
我们知道线程池是用来管理线程的,所以我们必要对一个 Thread 进行一个封装。
Worker 类内部包含一个线程,包含一个任务,并且记录了这个 worker 做过了多少任务。
还有,Worker 继承了 AQS ???
很神奇(如果不懂 AQS 的可以跳过 AQS 有关代码,专注于线程池的重点实现)
(当然,线程池学地差不多了,回过头学学 AQS 也挺好)
全网最权威讲解:AQS互斥锁实现
Worker 既然继承了 AQS,并且实现了 互斥锁。
但是,你想,它为什么要这样做???
每一个线程都是运行的自己的任务,又没有线程会去争抢别的线程的任务,那为什么需要加锁??
实际上,这个锁是在线程池关闭的时候用的。(我在后文再继续讲解)
private final class Worker
// 看到没?? 这是一个 AQS!!!
// 想不到吧
extends AbstractQueuedSynchronizer
// 同时是一个 Runnable,因为作为工作线程
implements Runnable
{
// 包含的线程
final Thread thread;
// 当前的任务
Runnable firstTask;
// 完成过多少任务
volatile long completedTasks;
/**
* 这个构造方法是我们之前阅读时看到过的
* 它传入了一个 Runnable 任务
* 创建一个 worker 线程,来执行这个任务
*/
Worker(Runnable firstTask) {
// setState 方法是 AQS 里的,这里设置为 -1
// 原注释翻译:在运行前禁止中断
// (看后文的 runWorker 方法就能明白了)
setState(-1);
// 赋予执行任务
this.firstTask = firstTask;
// 通过线程工厂创建线程(传入的是this:当前worker)
this.thread = getThreadFactory().newThread(this);
}
// 重写的run方法,写在了外部类中,直接调用
public void run() {
runWorker(this);
}
// !!!!!!!!!!!!!
// 后面的内容就有关于 AQS,如果你对AQS不了解,那有可能会看不懂
// 选择性跳过(如果你懂AQS,那就最好看明白)
// (不然,可以去看我写的AQS源码博客)
// 看到后面重写的 tryAcquire 和 tryRelease 方法,
// 就能知道 worker 是互斥锁(不懂AQS你就不知道。。。)
//
// 0 表示 没有加锁
// 1 表示 加锁了
protected boolean isHeldExclusively() {
// 返回是否被加了互斥锁
return getState() != 0;
}
// 重写AQS的 tryAcquire方 法
protected boolean tryAcquire(int unused) {
// 如果 CAS 加锁成功,就把持有锁线程设置为当前线程
// (要是不明白,去学 AQS 。。。)
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 加锁失败返回 false
return false;
}
// 重写AQS的 tryRelease 方法
protected boolean tryRelease(int unused) {
// 把持有锁线程设置为 null
setExclusiveOwnerThread(null);
// 把 state 值改为 0
setState(0);
return true;
}
// 封装 AQS 的加解锁方法(没啥好说的,懂的都懂)
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 记得上面有一行注释:在运行前禁止中断
*
* 这里我还没看调用这段的代码的地方,但是我应该能猜对用途
* 我直接分析一下:
* 运行前不能被打断,所以应该类似于AQS的操作
* 先获取并抹去 interrupt 打断状态
* 然后最后在这里将其 重新interrupt加上打断状态
* (看过我AQS源码的应该知道我在说啥。。)
*/
void interruptIfStarted() {
Thread t;
// 如果没有打断标记(说明之前被抹去了)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 调用interrupt方法,给线程赋予打断标记
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
runWorker 线程执行核心方法
上面我们看了,Worker 对象的 run 方法,就是执行的 这个 runWorker 方法,其中把自己作为参数传过来
- 首先就是 获取线程的引用
- 提取出任务,将 worker 的 task任务 置空(表示任务被提取出开始执行了)
- unlock() 这里呼应了我们上面看的 Worker 类
(Worker 初始化时 state 设置为 -1,表示不可被打断,此时把它 unlock 调,state 变为 0,就可以被打断了) - 然后在 try 方法中 while 循环,反复从队列取出任务去执行
在 while 循环中,每一次循环都会先获取 task,如果能获取到:
- 先加锁,
然后惯例检查线程池状态。。 - 任务执行前代理,执行任务,任务执行后代理
(这里的前后方法都是空壳方法,可以由子类去具体实现) - 任务执行结束,
将当前任务置空,
执行任务数量 +1,
解锁
final void runWorker(Worker w) {
// 取出worker对应的线程
Thread wt = Thread.currentThread();
// 提取出 worker 包含的 task 任务
// 然后就把 worker的task任务置空,表示任务已经被取出开始执行了
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁(呼应上面的 Worker 类代码)
// 构造方法 state=-1,表示不能被打断
// 此时调用 unlock 方法,表示可以被打断了
w.unlock();
// (记录非正常情况,不作探讨)
boolean completedAbruptly = true;
try {
// 首先如果 task任务 不为空
// 如果为空没关系,getTask 拿一个任务不为空就行
// (注意:获取任务是从阻塞队列里,如果当前没有会先阻塞住)
// (要是实在没有,就退出循环)
while (task != null || (task = getTask()) != null) {
// 对 worker 进行加锁
// (上面说过了 worker继承AQS,实现了一把互斥锁)
w.lock();
/**
* 如果线程池停止,则确保当前线程中断
* 如果没有,就确保没有中断
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
/**
* 这个try里面开始执行任务
* 注意:里面执行任务前后,
* 有一个beforeExecute、afterExecute
* 就是在执行前后进行代理操作
* (这就涉及到代理模式)
* 此外,在这个类中,这两个是空方法,需要子类按需实现
* (所以又涉及模板方法模式)
* (设计模式在此不详细讲)
*/
try {
beforeExecute(wt, task); // 空方法
try {
task.run(); // 真正执行任务
afterExecute(task, null); // 空方法
} catch (Throwable ex) {
afterExecute(task, ex); // 空方法
throw ex;
}
} finally {
// 当前任务置空(就可以去下一次循环领取任务)
task = null;
// 执行过的方法 +1
w.completedTasks++;
w.unlock(); // 解锁
}
}
completedAbruptly = false;
} finally {
// 工作线程退休方法
processWorkerExit(w, completedAbruptly);
}
}
getTask 获取任务(以及线程死亡)
getTask 是用来从队列里获取任务的,同时也是剔除多余线程的方法
里面用了 timed 表示是否可以剔除线程:
1、如果设置了核心线程可以被剔除,给出了剔除时间的话
2、或者,线程池里面有非核心线程,那超时后,得剔除多余线程
然后,在从队列取任务时,会带有超时的方法,如果超出时间了,还没有任务,
那么,这个工作线程,就可以退休了,
在这个方法中就会把 worker 工作线程数量 -1,然后返回 空的任务
(返回空任务之后,线程就会退休了)
private Runnable getTask() {
// 表示已经到达了该移除多余线程的时间
boolean timedOut = false;
// 死循环
for (;;) {
// 获取 ctl值,没啥好说的
int c = ctl.get();
// 如果线程池已经 STOP 了(那么此时队列一定空了)
// 或者线程池被 SHUTDOWN,并且队列也空了
// (这个时候,工作线程就应该结束退出了)
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// 就把 ctl 的值 -1(表示worker数量-1,
// 因为worker数量是 ctl的 后29位,可以直接减)
decrementWorkerCount();
// 然后方法返回null,表示没有获取到任务
return null;
}
// 获取工作线程worker的数量
int wc = workerCountOf(c);
// 判断是否可以移除线程
// 1、因为线程池可以设定核心线程可以在一定时间的空闲后移除
// 2、非核心线程一定会设有时间,在间隔时间后将其移除
// 先判断 核心线程能否移除 如果不能,再看是否有非核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果可以移除线程,并且超时了(正常不会wc>maximumPoolSize)
// 还有并且,当前线程数 >1,或者队列空了
// 才 CAS 减去线程数量,表示该线程要结束了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// try 里面获取任务
try {
// 如果可以移除线程
// 那么就执行 带超时的 poll方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 否则执行不限制时间的 take方法
workQueue.take();
// 获取到了,就可以把任务返回
if (r != null)
return r;
// 否则,将timeOut设置为true
// 表示超时间了,可以将线程移除
timedOut = true;
} catch (InterruptedException retry) {
// 如果被打断了,就进下一次循环
timedOut = false;
}
}
}
processWorkerExit(线程死亡)
工作线程的退休方法
1、退休时也需要先加 总锁(这时就无法新增线程、关闭线程池等)
2、先将这个 worker 执行过的所有任务数量,添加到线程池的 总执行任务数
3、然后,无情地将其 从集合中移除
4、解锁
(这些是主要方法)
后面的又是做一些判断
1、终止线程池(会在方法内部看看线程池有没有被关闭)
2、检查线程池状态(线程池中,到处有检查状态的地方)
3、检查线程数量的合理性
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果突然来到这个方法,意味着worker数量还没有减
// (非正常情况)
if (completedAbruptly)
decrementWorkerCount();
// 退休时也要加锁,这时就无法新增线程、关闭线程池等
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将这个worker完成的任务,记录到线程池完成的总任务数中
completedTaskCount += w.completedTasks;
// 无情地移除worker。。。
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池
// (主要是看看是否关闭了线程池,以及是否worker都结束等等条件,判断是否可以终止了)
tryTerminate();
// 又是判断线程池的状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 获取最小线程数
// 如果核心线程可以结束,那一定是 0
// 否则,那最小就是核心线程数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果为0,也就是核心线程可以结束,并且队列不为空,那么至少有线程执行任务
// 所以改为 1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果当前数量大于最小数量,就没问题,直接方法返回了
if (workerCountOf(c) >= min)
return;
} // 否则,说明有线程意外死亡了
// 所以得新增线程
addWorker(null, false);
}
}
shutDown 关闭线程池
关闭线程池的方法,有些不用细究
首先是加 主锁(保证关闭线程池时,不能够做其他工作)
首先是一些常规检查操作(我们的关注点不在这)
然后是用 CAS 自旋,将线程池状态设置为关闭
重点是 interruptIdleWorkers 方法,我们随后来看其源码
然后 onShutdown 是个空方法,可以子类自己去实现。
然后尝试一下终止线程池(因为可能队列中还有任务没有处理完,或者线程手头还有任务没有处理完,所以这时终止线程池不一定成功)
(但是我们在看之前的 processWorkerExit 线程退休的方法的时候,里面就有 尝试终止线程池的方法,所以即使线程池不能立即终止,随着剩余任务的执行结束,线程池也会被最后一个退出的线程终止)
public void shutdown() {
// 加主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查操作
checkShutdownAccess();
// CAS自旋将线程池状态修改
advanceRunState(SHUTDOWN);
// 重点是这个方法,我们点进去看
interruptIdleWorkers();
// 钩子方法,可以由用户自己按需去实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程
//(可能还有任务没有处理完,所以这时终止线程池不一定成功)
// 但是随着线程一个个退出,最后会有线程将其终止
tryTerminate();
}
interruptIdleWorkers 打断所有线程
此时,线程池关闭了,因此需要停止掉其他工作线程
首先是要对 mainLock 主锁加锁,此时对线程池的一切操作都会阻塞
然后是对所有的线程去 interrupt 打断。
这时,你就会发现有用到 worker 的加锁方法了
(所以说 worker 的锁的用途是用来避免在 任务执行过程中被打断)
但是,为什么 interrupt 所有的线程就能使线程池最终终止呢???
我们都知道,interrupt 只是一个标记,它是如何做到的???
我们继续分析:
首先,根据我们之前看过的代码,我们知道,一个线程会不断地去队列中取任务,然后执行。
所以,加上一个 interrupt 标记,在此时任务还没执行完时起不到作用。
只有等到队列中为空,工作线程完成了手头的任务,此时再来获取的话:
就会因为 interrupt 的存在:
无法阻塞 !!!直接返回空的方法 !!!
所以这时,工作线程由于取出空任务了,就会让自己退休。
所以线程池的所有线程都会这样销毁。
线程池最终才能终止。
private void interruptIdleWorkers(boolean onlyOne) {
// 加 主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历每一个 worker
for (Worker w : workers) {
Thread t = w.thread;
// 要是没被打断,就尝试加锁将其打断
if (!t.isInterrupted() && w.tryLock()) {
try {
// 打断
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果只有一个线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
我们知道,还有一个方法,叫 shutdownNow()
它可以让工作线程不再执行任务,而是能很快速地终止掉线程。
这时如何实现的呢??
实际上很简单,只是对 shutDown 的方法,添加一点小操作:
直接将队列中所有没有执行的任务直接取出返回。
这时,队列中没有任务了,
那么,线程池内的一个个线程,就会按照我之前描述的那样,一个个最终销毁。
JDK 提供的四种常用线程池
SingleThreadExecutor:单线程的线程池
看名字就知道这个线程池里只有一个线程,这一个线程可以保证扔进去的任务是顺序执行的。
但是肯定会有人有人有这么一个问题,为什么要有单线程的线程池?
第一、线程池是有任务队列的,它可以将各个线程提交的任务按顺序排好依次执行;
第二、线程池提供了生命周期管理,不用再手动去管理线程。
我们看一下源码,就能很容易理解它的实现:
- 核心线程数和最大线程数都是 1,所以线程被限定死了只有 1 个;
- 线程消亡时间实际无所谓,因为核心线程默认不会死亡
- 默认创建了一个链表阻塞队列(也就是存放的任务最大数量为 Integer.MAX_VALUE,几乎可以说是无限的)
- 线程工厂、拒绝策略没有指定(就使用默认线程工厂、默认拒绝策略:抛异常)
// 单线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, // 线程数设置为 1
0L, TimeUnit.MILLISECONDS, // 时间无所谓,不会消亡
new LinkedBlockingQueue<Runnable>())); // 阻塞队列
// 默认线程工厂、拒绝策略:抛异常
}
CachedPool:缓存型线程池
就是来一个任务,如果有现有的线程,那就用现有的这个线程去执行任务;
如果现有的线程都在忙着,那么就启动一个新的线程去执行任务;
如果有线程空闲了 60 秒,那就可以退休了。
可见就是依据现有的任务量来决定线程数的:
在流量高峰期,那么线程就会很多;
而在没什么任务时,就只有很少,或者没有线程。
我们看一下源码,就能很容易理解它的实现:
- 核心线程数是 0,最大线程数为 Integer.MAX_VALUE(就是几乎没有上限)
(所以线程都是非核心的,都是可以超时死亡的) - 超时时间被设置成了 60 秒,也就是 一分钟 没有任务,那这个线程就可以退休了
- 阻塞队列采用 SynchronousQueue:
(这时一个特殊的阻塞队列,它可以让提交任务的线程在提交时阻塞,必须等到任务消费者取出任务时才能继续往后执行:就相当于一个手递手的过程) - 线程工厂、拒绝策略没有指定(就使用默认线程工厂、默认拒绝策略:抛异常)
public static ExecutorService newCachedThreadPool() {
// 只有非核心线程,而且几乎无上限
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
// 一分钟超时
60L, TimeUnit.SECONDS,
// 一方提交,必须等到另一方获取才能继续
// 相当于一个手递手过程
// 而线程池如果线程不够,就会立即创建新线程将任务取走
new SynchronousQueue<Runnable>());
// 默认线程工厂、拒绝策略:抛异常
}
FixedThreadPool:固定线程池
见名知意,fixed 是固定的意思,就是固定的一个线程数,FixedThreadPool 指定一个参数,到底有多少的线程,核心线程和最大线程都是相同的并且固定的,所以这个线程池就不会回收线程,所以超时时间就可以设置为 0。
用一个固定的线程池有什么好处呢?(也许固定线程池是目前来说用的最多的)
就是你可以并行计算。
并行和并发有什么区别,就是并发是多个任务同时过来,看起来是在一起执行的,但是线程池内部到底怎么样,谁也不清除。
但是并行,是需要有多核 CPU,你有和 CPU 核数相同的线程,那么这些线程才能并行,分别在不同的线程上执行任务。
固定线程数量还有一个好处,就是资源的消耗是不会有太大波动的(没有线程不断创建、不断销毁的资源消耗)。
(不像缓存型线程池,线程可以很多,又可以很少,对机器性能的影响也是忽高忽低)
但是,这就需要人去预估业务量,就比较考验人预估的准确性。符合了预期的业务量,才能发挥出好的性能。
我们看一下源码,就能很容易理解它的实现:
- 核心、最大线程线程数相等
(也就是没有非核心线程,不会有线程销毁) - 不会有线程会销毁,超时时间被设置成 0 秒
- 阻塞队列采用 LinkedBlockingQueue,链表阻塞队列(最大长度 Integer.MAX_VALUE 几乎无界)
- 线程工厂、拒绝策略没有指定(就使用默认线程工厂、默认拒绝策略:抛异常)
public static ExecutorService newFixedThreadPool(int nThreads) {
// 核心、最大线程数相等,也就是没有非核心线程,不会有线程销毁
return new ThreadPoolExecutor(nThreads, nThreads,
// 反正不会销毁、时间设为0
0L, TimeUnit.MILLISECONDS,
// 链表阻塞队列(Integer.MAX_VALUE 几乎无界)
new LinkedBlockingQueue<Runnable>());
// 默认线程工厂、拒绝策略:抛异常
}
ScheduledPool:定时任务线程池
ScheduledPool 定时任务线程池,就相当于我们原来学过的一个定时器任务,每隔一段时间之后,这个任务就会去执行。
我们 new 出一个定时任务线程池的时候,它返回的是一个 ScheduledThreadPoolExecutor,然后在里面调用了一个 super,而它的父类,本质上还是一个 ThreadPoolExecutor。
看源码,只手动指定了 corePoolSize;
直接调用了父类 ThreadPoolExecutor 构造方法。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
我们看一下源码,就能很容易理解它的实现:
- 核心线程数由程序员手动指定,最大线程数为 Integer.MAX_VALUE(几乎没有上限)
- 超时时间被设置成默认时间:10 毫秒
- 阻塞队列采用 DelayedWorkQueue
(任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行) - 线程工厂、拒绝策略没有指定(就使用默认线程工厂、默认拒绝策略:抛异常)
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 核心线程由程序员指定,最大线程数为 Integer.MAX_VALUE(几乎没有上限)
super(corePoolSize, Integer.MAX_VALUE,
// 默认时间:10 毫秒
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
// 阻塞队列采用 DelayedWorkQueue
// 任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行
new DelayedWorkQueue());
// 默认线程工厂、拒绝策略:抛异常
}
可以用 scheduleAtFixedRate 方法提交定时任务
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
// 定时任务代码
},第一个任务执行结束后推多久, 第二个任务间隔多久, 时间单位);
JDK 默认线程池的问题
在阿里的员工手册中是明确指出的,ThreadPoolExecutor 线程池必须由程序员亲自手动创建,而不允许使用 Executors 线程池工厂创建默认的线程池。
这是为什么呢??
下面我来一一列举这些问题
首先,线程工厂都是默认的,这样,创建的线程都是程序里默认的定义;
这样,就很难追溯到到底是谁写的线程池,出现了问题,也不知道该从何处排查纠错。
我们看一下默认线程工厂是如何创建线程的:
- 线程的组由系统中获取(这个不必细究)
- 重点在线程名字!!!
前缀:线程池编号:就是在一个 JVM 中,每创建一个线程工厂,编号自增 1(这样的名字,如何定位排查???)
当前线程编号:就是在每一个线程工厂中,每创建一个线程,线程的编号 自增 1。。。。。。 - 线程必须是非守护(后台)线程
- 线程必须是默认优先级
所以线程工厂是不允许用默认的:
而必须自定义,名称必须是自己和小组有关的名字,能够让人识别
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 如果不是,就设置为 非后台线程
if (t.isDaemon())
t.setDaemon(false);
// 如果不是,就设置为 默认线程优先级
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
// 线程工厂构造方法
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
// 线程组是从系统获取的
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 重点看线程的名字前缀:包含了当前线程池的编号
// 线程池的编号就是当前JVM上,每创建一个线程工厂,编号就+1
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
第二点,如果是单线程线程池(SingleThreadExecutor)、固定线程池(FixedThreadPool),它们的通病就是:
LinkedBlockingQueue !!!!
可能有些人会疑惑,这有什么问题?
实际上,问题就在队列的长度: Integer.MAX_VALUE
这几乎是一个无界的队列,所以队列长度如果过长,就很容易产生堆内存占用多大导致 OOM。
所以,在真实的生产环境中,使用的队列,一定是有合适的界限的。
第三点,缓存线程池:
由于缓存线程池的最大线程数量可以到达 Integer.MAX_VALUE,几乎是无界的
所以在并发量一大,任务数量一多,就很容易造成线程数爆满
1、可能服务器性能拖垮
2、可能直接 OOM,程序崩溃终止
第四点,拒绝策略:
- 第一种,在真实的生产环境中,我们肯定不能使用抛出异常这样的策略,这样,程序就会直接被终止;
服务崩溃,可能会导致连环效应,或者丢失数据。 - 第二种,也不可能采取静悄悄地丢弃策略(你见过谁家服务器主动丢数据了)
- 第三种,丢弃最老的任务,应用场景也不广泛
- 第四种,让提交的线程去直接执行(看起来似乎问题不大),
但是,发生拒绝策略,一定意味着服务器资源已经处理不过来这些过于多的任务了,
如果此时,还让提交任务的线程去执行任务,那么意味着提交任务的线程也被耗尽,就没有线程可以去接收任务,
导致整个服务不可用
所以,在真实生产环境,一般不用默认拒绝策略,而是自定义拒绝策略。
比如多出的任务保存到 Kafka,或者数据库,之后等资源有空闲,立刻再去消费
总结
- 线程池初始化时,需要设置:
核心线程数、最大线程数、线程死亡时间、时间单位、阻塞队列、线程工厂、拒绝策略 - 线程池的默认线程工厂,每一个线程工厂创建,都会有一个自增的 整型 编号,
线程工厂每创建一个线程,都会带上工厂编号,并且线程的编号 自增 - 线程池的拒绝策略,常用的有:
抛异常、悄悄地丢掉、丢弃队列中最老的、让提交的线程执行 - 线程池的核心线程也可以设置死亡,只要调用 allowCoreThreadTimeOut 方法设置即可
- 提交(submit)任务时会返回 FutureTask
- FutureTask 既是一个任务,又是一个将来的任务结果。
线程池执行这个 FutureTask,并且提交者可以从返回的这个 FutureTask 中获取结果 - 线程池会用一个 AtomicInteger(ctl)来记录线程池的状态和工作线程的数量
ctl 的前 3 位记录线程池状态,后 29 位记录 worker 数量 - Worker 继承了 AQS,本身就是一把锁
- Worker 在还没有 run 之前,由于 state 初始化为 -1,无法被打断,只有在 run 方法 unlock 一下才能被打断。
- 工作线程在没有任务时,会去阻塞队列中自己取任务
- 线程数量如果大于核心线程数,或者核心线程也可以销毁,
那么取任务时会遵循超时时间,取不出任务会返回 null,然后自己将退休 - 线程池关闭的实质,是 interrupt 打断所有工作线程
这样,在队列中没有任务时,线程取任务将无法阻塞,直接返回空,因而线程可以全部退休 - 默认线程工厂创建的线程名字不方便排查,最好用户自定义线程工厂
- 阻塞队列不能不设置大小
- 线程池最大线程也不能不设置大小
- 拒绝策略要根据需求自定义