先看看ThreadPoolExecutor的类图:
红色为比较重要,需要用到的部分
来看看重要类的主要成员和方法
ThreadPoolExecutor
1.成员变量
// ctl是一个原子变量,用来记录线程池状态和线程池中线程的个数
// 类似于ReentrantLock
// 高3位表示线程池状态,低29位表示线程数量⚠️
// 默认是RUNNING状态,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程数的掩码位数,我的Mac上是29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程最大数量,我的Mac上是11111111111111111111111111111,29位
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池运行状态,都是存在高3位的
// 11100000000000000000000000000000,32位
private static final int RUNNING = -1 << COUNT_BITS;
// 0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000000000000000000000000000,32位
private static final int STOP = 1 << COUNT_BITS;
// 01000000000000000000000000000000,32位
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000000000000000000000000000,32位
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取高3位,也就是运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位,也就是线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl新值:运行状态 + 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 添加worker时需要加一个锁
private final ReentrantLock mainLock = new ReentrantLock()
// 阻塞队列,容纳排队的任务
private final BlockingQueue<Runnable> workQueue;
线程池状态如下:
- RUNNING:接受新任务并处理阻塞队列里的任务
- SHUTDOWN:拒接新任务,但处理阻塞队列里的任务
- STOP:拒接新任务,并且抛弃阻塞队列里的任务,同时会中断正在处理的任务
- TIDYING:所有任务处理完后(包含阻塞队列里的任务)后当前线程池里的active线程数量为0,将要调用terminated()方法
- TERMINATED:终止状态,调用terminated()后的状态
状态转换如下:
- RUNNING ——> SHUTDOWN:显式调用了shutdown()方法,或在finalize()方法中隐式调用了shutdown()方法
- RUNNING或SHUTDOWN ——> STOP:显式调用了shutdownNow()方法
- SHUTDOWN ——> TIDYING:线程池和任务队列都为空
- STOP ——> TIDYING:线程池为空
- TIDYING ——>TERMINATED:当terminated() hook方法结束
2.重要方法
public void execute(Runnable command)
public void shutdown()
public List<Runnable> shutdownNow()
ThreadPoolExecutor内部类Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{...}
1.成员变量
// worker承担的线程
final Thread thread;
// worker的Runnable任务
Runnable firstTask;
2.重要方法
// 作为一个Runnable,应该要有自己的run()吧
public void run()
// 作为一个AQS,要有自己的lock、unlock、tryAcquire、tryRelease方法吧
public void lock()
protected boolean tryAcquire(int unused)
protected boolean tryRelease(int unused)
Executors内部类DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory
2.重要方法
public Thread newThread(Runnable r)
正片开始
- Test.java(测试类)
@Test
public void fixTest(){
ExecutorService executorService = Executors.newFixedThreadPool(3); < — — — — — a.进去
for (int i = 0; i<100; i++){
executorService.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
executorService.shutdown();
}
- Executors(工具类,专门生成线程池)
// 其实就是new一个ThreadPoolExecutor,填充它的参数
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); < — — — — — b.进去
}
- ThreadPoolExecutor
// ThreadPoolExecutor构造方法,就是填充它的参数,不重点分析
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// if各种判断,对参数的合法性做一些校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 参数填充
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- Test.java(测试类)
@Test
public void fixTest(){
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i<100; i++){
executorService.execute(()->{ < — — — — — c.进去
System.out.println(Thread.currentThread().getName());
});
}
executorService.shutdown();
}
- ThreadPoolExecutor
// 传入我们的Runnable任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 得到线程池状态ctl的值,ctl是一个AtomicInteger
int c = ctl.get();
// 如果worker数量小于核心线程参数(按照线程池规则,这时会启动新线程来接纳该任务)
// 进入if
if (workerCountOf(c) < corePoolSize) {
// 进入addWorker方法
if (addWorker(command, true)) < — — — — — d.进去
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- ThreadPoolExecutor
// 参数:firstTask就是我们传入的Runnable任务对象,core = true
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 死循环,等待打破
for (;;) {
// 再次得到线程池状态ctl的值
int c = ctl.get();
// 得到线程池运行状态
int rs = runStateOf(c);
// 现在是默认状态RUNNING,所以rs < SHUTDOWN,跳过if
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层死循环,等待打破
for (;;) {
// 得到线程池状态之———— worker数量
int wc = workerCountOf(c);
// 如果wc大于了最大线程数量,此处false
// wc == 0,小于corePoolSize 和 maximumPoolSize
// 跳过if
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 利用CAS把ctl+1
if (compareAndIncrementWorkerCount(c))
// 成功就退出两层循环
break retry;
// CAS失败则再次获取线程池状态,可能刚刚有其他线程进行了CAS
c = ctl.get(); // Re-read ctl
// 如果线程池状态和刚刚不一致,进入if
// 否则,在内层循环中开启第二轮循环
if (runStateOf(c) != rs)
// 跳到外层循环重新开始第二轮大循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 到了这里,证明CAS成功,线程池状态ctl已经成功被原子性地+1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一个worker,把我们传入的Runnable任务对象传进去
w = new Worker(firstTask); < — — — — — e.进去
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- ThreadPoolExecutor.Worker
// 内部类Worker的构造函数
Worker(Runnable firstTask) {
// 把它的AQS的state状态设置为-1,避免在runWorker()被中断(以后分析),接着走
setState(-1); // inhibit interrupts until runWorker
// 把我们传入的Runnable任务对象传进去赋给Worker的成员变量firstTask
this.firstTask = firstTask;
// 使用DefaultThreadFactory的newThread()方法获得一个新建线程
this.thread = getThreadFactory().newThread(this); < — — — — — f.进去
}
在调用ThreadPoolExecutor的构造方法时,就给ThreadPoolExecutor构造方法传入了Executors的defaultThreadFactory
- Executors.DefaultThreadFactory
// 获取新线程
// ⚠️注意:此处有伏笔:r是我们刚刚传入的“this”,也就是该worker本身。
// 该worker本身是一个Runnable对象,他有它的run()方法
public Thread newThread(Runnable r) {
// 新建线程,给它组、线程名
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t; < — — — — — g.返回
}
- ThreadPoolExecutor
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
// 得到该Worker的刚刚新建的thread成员对象
final Thread t = w.thread;
// 进入if
if (t != null) {
// 拿到ThreadPoolExecutor实例的重入锁,加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取线程池的运行状态
int rs = runStateOf(ctl.get());
// 如果rs是RUNNING,进入if
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// precheck that t is startable
if (t.isAlive())
throw new IllegalThreadStateException();
// HashSet<Worker>把该worker添加进去
workers.add(w);
// 得到HashSet<Worker>内worker数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 设置添加worker成功
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 如果刚刚添加worker标志为true
if (workerAdded) {
// 该worker的线程成员对象start(),会调用该thread包装的Runnable对象的run()方法
// 该thread包装的Runnable对象就是刚刚新建thread时,传入的该worker本身
t.start(); < — — — — — h.进入
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- ThreadPoolExecutor.Worker
public void run() {
// 调用runWorker
runWorker(this); < — — — — — i.进入
}
- ThreadPoolExecutor
// 传入Worker自己
final void runWorker(Worker w) {
// 得到当前线程
Thread wt = Thread.currentThread();
// 得到该Worker的Runnable任务对象
Runnable task = w.firstTask;
w.firstTask = null;
// 暂未详细分析⚠️
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果该Worker的Runnable任务对象不为空,进入if
while (task != null || (task = getTask()) != null) {
// Worker本身是一个AQS实现的锁,加锁
w.lock();
// runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
// 第一个为false || 第二个也为fasle,整体为false,跳过if
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行该Worker的Runnable任务对象的run()方法,也就是我们自己写的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}