回顾
上一篇中我们主要介绍了ThreadPoolExecutor
的继承关系以及它的核心方法execute()
,我们知道它的顶层接口是Executor
——线程执行的工具,真正的线程池接口是ExecutorService
。ThreadPoolExecutor
直接继承自抽象类AbstractExecutorService
,由AbstractExecutorService
实现ExecutorService
接口,再由ExecutorService
继承Executor
。
在execute()
方法中,根据线程池中线程的数量是否达到corePoolSize
来决定是加入到缓存作业队列还是新建工作者,直接完成任务等,换言之execute()
是ThreadPoolExecutor
执行作业的核心方法。
本篇我们对ThreadPoolExecutor
的其他方法进行解析。
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core)
方法中,参数firstTask
是需要被执行的任务,core
表示当前线程数是否小于corePoolSize
。我们将源码分为两个部分进行解析
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
上述代码主要是通过CAS操作更新当前的线程数量,以上两个大的无限for循环,外层for循环在不断的检测线程池是否处于Running状态,一旦线程池不在Running则立刻返回false,这是因为处在shutDown状态的线程池是无法添加新的作业,我们在介绍shutDown()
方法时已经说明过。
内层for循环负责执行CAS操作更新池中线程数量,if (compareAndIncrementWorkerCount(c))
负责执行比较线程数与corePoolSize进行比较更新线程数量,如果执行成功的话,就会跳出外层循环;如果更新失败,则需要再次对线程池的状态进行check,这是因为CAS操作并没有进行同步,所以此时的线程池状态可能发生了改变,如果发生了改变,就需要跳出内层循环,回到retry
重新执行外层循环,如果未改变,则继续内层循环。
在线程池数量更新成功之后,可以继续执行添加线程的逻辑:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建了一个工作者
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//一个可重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
其中在try代码块中,我们可以看到新建了一个Worker对象,这个对象内部封装了我们传入的Runnable对象,Worker
类是ThreadPoolExecutor
的内部类,实现了Runnable接口,对象持有一个Thread对象thread,thread通过传入this
创建,然后下面的代码段使用了一个可重入锁,而后对线程池状态可能产生影响的操作都在同步代码段里,这里首先判断了线程池是否处于RUNNING状态,是则将线程t加入到运行线程的Set(也就是workers
)中,并对largestPoolSize
进行更新。
添加完成之后释放锁,然后启动我们新添加的线程,到这里,线程数小于corePoolSize的情况下,我们新添加的作业就已经被启动了。