1 Executor
它用于执行指定的任务,把任务提交与任务执行分离,程序员不需要关注线程的管理,以及任务的执行。 ExecutorService 接口对 Executor 接口提供更多的扩展,ThreadPoolExecutor 类提供 可以扩展的线程池实现,而 Executors 只是对这些 Executor 提供方便的工厂方法。
1.1 类图
2 Future
2.1 类图
2.2 FutureTask
类结构:
class FutureTask {
// 任务运行状态
/*
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 底层运行的 callable
private Callable<V> callable;
// 任务结果,非 volatile,因为受 state 读写保护
private Object outcome;
// 运行 callable 任务的线程
private volatile Thread runner;
private volatile WaitNode waiters;
// 在 Treiber stack 中记录等待中的线程
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}
复制代码
两个重要的构造函数:
// 包装 Callable,state = NEW
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
复制代码
第二个构造器使用了 Executors 工具类的 callable() 方法来把 Runnable 适配成 Callable。典型的 适配器模式。
public static <T> Callable<T> callable(Runnable task, T result) {
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
复制代码
3 ThreadPoolExecutor
3.1 类结构
class ThreadPoolExecutor {
// 状态控制,包含两个信息:
// workerCount: 工作线程数
// runState: 表明线程池运行状态
// RUNNING: 可以接受新的 task,且能处理队列中的 task
// SHUTDOWN: 不接受新的 task,但能处理队列中的 task
// STOP: 不接受新的 task,不处理队列中的 task,并且中断运行中的 task
// TIDYING: 所有任务都被终止,workerCount = 0,线程状态为 TIDYING
// TERMINATED: terminated() 方法执行完成
// RUNNING
// shutdownNow()| \ shutdown()
// |/ \/
// STOP---- SHUTDOWN
// 线程池为空| / 队列和线程池都为空
// |/ \/ \
// TIDYING------------- TERMINATED
// terminated()
// 初始化:ctl = RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 11101
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111111111111111111111111111 536870911
// worker 线程最大数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState 存储在高位
// 111 开头 -536870912
private static final int RUNNING = -1 << COUNT_BITS;
// 000 开头 0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 开头 536870912
private static final int STOP = 1 << COUNT_BITS;
// 010 开头 1073741824
private static final int TIDYING = 2 << COUNT_BITS;
// 011 开头 1610612736
private static final int TERMINATED = 3 << COUNT_BITS;
// ~CAPACITY = 11100000000000000000000000000000
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
}
复制代码
3.2 内部类 Worker
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
// 执行任务线程
final Thread thread;
// 待执行任务
Runnable firstTask;
// 每个线程完成任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 设置AQS的同步状态为-1,禁止中断,直到调用 runWorker
this.firstTask = firstTask;
// 使用 ThreadFactory 创建线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// 核心
runWorker(this);
}
}
复制代码
3.3 任务执行(execute)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 初始时,ctl = RUNNING = 11100000000000000000000000000000
int c = ctl.get();
// wokerCountOf(c) => c & 00011111111111111111111111111111
// 线程数小于 corePoolSize,创建线程执行 task
if (workerCountOf(c) < corePoolSize) {
// 创建线程,并启动线程执行 command
if (addWorker(command, true))
return;
c = ctl.get();
}
// 两种情况:
// ① 线程数大于等于 corePoolSize;
// ② 线程数小于 corePoolSize,但是 addWorker() 线程池状态不符合条件,或创建线程失败或启动线程失败
// 线程池处于运行状态才能往队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 线程池非 RUNNING 状态
// 队列已满
// 此时使用 maximumPoolSize 作为界限判断
else if (!addWorker(command, false))
reject(command);
}
/**
* addWoker() 方法返回 false 几种情况:
* 1)线程池停止或 shutdown
* 2)ThreadFactory 创建线程失败
* @param core true:使用 corePoolSize 作为界限;false:使用 maximumPoolSize 作为界限
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// ① 线程池状态是 RUNNING,可以创建工作线程,即执行下面的 for 循环
// ② 线程池状态是 STOP、TIDYING 或 TERMINATED,直接返回 false,即不能创建工作线程执行任务
// ③ 线程池状态时 SHUTDOWN,此时 firstTask 在此种状态下应该为 null,即它不能接收新的任务
// 所以如果 firstTask 不为 null,那么可以直接返回 false
// 如果 firstTask 为 null,此时线程池可以处理队列中任务,所以如果队列为空,那么直接返回 false,否则需要创建工作线程处理队列中的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// runState 为 RUNNING 或 SHUTDOWN
for (;;) {
int wc = workerCountOf(c);
// woker 线程数超过界限,返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 增加 workerCount
if (compareAndIncrementWorkerCount(c))
break retry; // 失败
c = ctl.get(); // Re-read ctl
// 可能 runState 被其他线程改变,非 RUNNING 或 SHUTDOWN 状态
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化 Worker,创建线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) { // 线程创建成功
// 全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次检查线程池的运行状态等
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
// 线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 实际会调用 Worker 类的 run() 方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) //未能成功创建执行工作线程
addWorkerFailed(w); //在启动工作线程失败后,将工作线程从集合中移除
}
return workerStarted;
}
复制代码
从上述代码分析可以看出,有很多书上在分析线程池的执行原理是有问题的。
小结上述逻辑:- 线程池控制标识是原子类型,即 AtomicInteger
- execute() 方法只关注线程池核心工作流程,具体执行细节交由其他方法处理,主要是 addWorker() 方法
- 工作线程小于 corePoolSize 时,创建工作线程以及执行任务统一交给 addWorker() 方法处理
- addWorker() 方法在创建工作线程执行任务之前,需要判断线程池当前状态是否满足可以创建工作线程。
- RUNNING 状态可以接收新的任务(能创建工作线程),且能处理队列中的任务
- SHUTDOWN 状态不能接收新的任务(不能创建工作线程),但能处理队列中的任务
- 使用 CAS 计数工作线程,即更新线程池控制标识
- 任务和线程统一由其内部类 Worker 进行封装,Worker 类本身也是 Runnable
整个流程图如下,画的不太好,主要还是为了梳理逻辑,因为整个代码用到了太多 if() 语句的短路思维,当然这些主要是跟线程池的状态有关。