在线程池方法中,有个重要的方法就是添加线程。也就是我们接下来要描述的addWorker()方法.
该方法主要做两个事情:
1).使用循环CAS操作来讲线程池的数量+1;
2).新建一个线程worker,添加到workers中并启用。
该方法中会传入两个参数,第一个是直接传入任务实列,第二个参数标识是否是核心线程。
接下来就主要关注这个方法的源码
1.该方法是个boolean返回的方法; 如果返回ture:表示创建worker成功,且线程启动成功;
如果返回false :表示创建worker失败;导致false的有多种原因
1)线程池状态rs > SHUTDOWN
2)rs == SHUTDOWN 但是队列中已经没有任务了 或者当前状态是 SHUTDOWN 且队列未空,但是 firstTask不为 null
3)当前线程已经达到指定指标(corePoolSize 或者 maximumPoolSize)
4)线程池ThreadFactory 创建的线程是 null
private boolean addWorker(Runnable firstTask, boolean core)
2.进入到该方法中,就有个for()循环的自旋;它的主要作用是 判断当前线程池是否允许创建线程的事情
for (;;)
3.深入到for之后, 先获取当前ctl的值,以及获取当前线程池运行状态
int c = ctl.get();
int rs = runStateOf(c);
4.接下来就是两个条件的判断;条件一: rs >= SHUTDOWN 为true时,表示当前线程池的状态不是Running
条件二:! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) 为true时, 当线程池状态, rs > SHUTDOWN 或者 rs == SHUTDOWN 但是队列中已经没有任务了,或者 rs == SHOTDOWN 且 firstTask不为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
5.以上的代码就是判断当前线程池状态是否允许添加线程。接下来又是一个for()内部自旋, 它的主要作用是创建线程的令牌过程。
for (;;)
6.再次获取当前线程池中的线程数量,进行判断;条件一 wc >= CAPACITY;该条件目前会永远不成立,CAPACITY是一个5亿多的数据。
条件二:wc >= (core ? corePoolSize : maximumPoolSize) ; 为true时,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制; 为false时, 拿到的是最大线程数量的限制;
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
7.接下来,如果条件成立,表明线程数量已经 +1成功了,相当于申请到了一块令牌。 如果为false:说明其他线程修改过了ctl 值了。可能其他线程execute() 申请过令牌了。 进入到方法里面后,说明是CAS成功,申请到了令牌,同时直接跳出了 retry 外部的自旋。
if (compareAndIncrementWorkerCount(c))
break retry;
8.没有申请到令牌的,再次获取ctl值;
c = ctl.get();
9.再次进行了一次判断,判断当前线程池状态是否发生过改变,如果外部在这之前调用过shutDown, shutDown会导致状态发生改变。 如果为true, 表示状态发生了改变,返回到了外层循环。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
10.经历过这两次for循环之后,说明允许创建线程。workerStarted 默认为false,表示 worker是否启动,ture为启动;
workerAdded 表示是否将worker添加到池子中,默认false,表示未添加; w 表示创建一个woker的一个引用
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
11.先创建worker,执行完毕后,表示创建worker完毕,将线程赋值给t
w = new Worker(firstTask);
final Thread t = w.thread;
12.进行了对t的一次判空,主要是防止ThreadFactory 被用户实现,传入null,出现控制住异常
if (t != null)
13.将全局锁的引用保存到 mainLock; 持有全局锁,可能会阻塞,知道获取成功为止,同一时刻,操作线程池内部相关的操作,都必须持有锁.
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
14.条件一: true: 表示 当前线程池为正常的RUNNING状态;条件二: (rs == SHUTDOWN && firstTask == null)true: 当前的线程状态为 SHUTDOWN ,且 firstTask 为空;
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
15.当线程start后,线程isAlive会返回true
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
16.添加线程池中。并获取最新的线程数,并和最大线程数进行一个判断比较,并赋值。且设置 workerAdded为true ;表示 线程已经加入到线程池中了。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
17.通过finally 进行了解锁
mainLock.unlock();
18.workerAdded为true 表示添加worker添加成功, 进行启动线程,且标记 启动标记设置为true;
if (workerAdded) {
t.start();
workerStarted = true;
}
19.在finally中,如果整个条件成立,说明启动失败,需要做清理工作。
if (! workerStarted)
20.清理工作主要做两件事:1.释放令牌;2.当前worker 清理出workers 集合
addWorkerFailed(w);
- addWorkerFailed里面的主要逻辑,持有线程池全局锁,因为操作的是线程池相关的东西
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
22.对worker的判空,如果为true, 开始做两件事,1.将woker从workers中移除;2.将线程池计数恢复-1,相当于归还令牌,最后进行了释放锁。
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
23.最后将workerStarted的状态返回;如果满足创建worker成功,且线程启动,则返回true,否则返回false.
具体的描述如下:
// firstTask 可以为 null ,表示启动worker之后,worker自动到queue中获取,如果不是 null ,则worker执行firstTaSK
// core 采用的线程数限制,如果为true 采用核心线程数 限制; false,采用 maximumPoolSize 线程数限制
// 返回值总结:
// true: 表示创建worker成功,且线程启动
// false : 表示创建worker失败
// 1.线程池状态rs > SHUTDOWN
// 2. rs == SHUTDOWN 但是队列中已经没有任务了 或者当前状态是 SHUTDOWN 且队列未空,但是 firstTask不为 null
// 3.当前线程已经达到指定指标(corePoolSize 或者 maximumPoolSize)
// 4.线程池ThreadFactory 创建的线程是 null
private boolean addWorker(Runnable firstTask, boolean core) {
// 自旋 判断当前线程池状态是否允许创建线程的事情。
retry:
for (;;) {
// 获取当前的 ctl 值
int c = ctl.get();
// 获取当前线程池运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 条件一 : rs >= SHUTDOWN true: 表示当前线程池状态不是Running
// 条件二: ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
// true : 当线程池状态, rs > SHUTDOWN 或者 rs == SHUTDOWN 但是队列中已经没有任务了,或者 rs == SHOTDOWN 且 firstTask不为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 以上代码就是判断当前线程池状态是否允许添加线程
// 内部自旋 ,获取创建线程令牌的过程
for (;;) {
// 获取当前线程池中线程数量
int wc = workerCountOf(c);
// 条件一: wc >= CAPACITY 永远不成立, CAPACITY是一个5亿多的数据
// 条件二: wc >= (core ? corePoolSize : maximumPoolSize)
// core == true ,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制
// core == false ,判断当前线程数量是否 >= corePoolSize,会拿最大线程数量做限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 表明当前无法添加新的线程了。
return false;
// true: 说明记录线程数量已经 +1成功了,相当于申请到了一块令牌。
// false : 说明其他线程修改过了ctl 值了。可能其他线程execute() 申请过令牌了。
if (compareAndIncrementWorkerCount(c))
// cas 成功,申请到令牌了
// 直接跳出了 retry 外部的自旋
break retry;
// 没申请到令牌,再次获取ctl的最新值
c = ctl.get(); // Re-read ctl
// 判断当前线程池状态是否发生过改变,如果外部在这之前调用过shutDown, shutDown会导致状态发生改变
if (runStateOf(c) != rs)
// 状态发生改变后,返回到外层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 来到了这里
// 表示创建的worker是否已经启动, false 未启动
boolean workerStarted = false;
// 创建的 worker是否添加到池子中, 默认false, 未添加
boolean workerAdded = false;
// 表示 后面创建worker的一个引用
Worker w = null;
try {
// 创建worker,执行完后,线程创建完毕
w = new Worker(firstTask);
// 将创建的线程节点, 赋值
final Thread t = w.thread;
// 进行了一次判断,为什么要判断?
// 为了防止ThreadFactory 是一个接口,谁都可以实现,防止别人传 null ,报空指针
if (t != null) {
// 将全局锁的引用保存到 mainLock
final ReentrantLock mainLock = this.mainLock;
// 持有全局锁,可能会阻塞,知道获取成功为止,同一时刻,操作线程池内部相关的操作,都必须持有锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 再次去获取最新的ctl 值,即运行状态
int rs = runStateOf(ctl.get());
// 条件一: true: 表示 当前线程池为正常的RUNNING状态
// 条件二: (rs == SHUTDOWN && firstTask == null)
// true: 当前的线程状态为 SHUTDOWN ,且 firstTask 为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 当线程start后,线程isAlive会返回true ;
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到线程池中
workers.add(w);
// 获取最新的 线程池中的worker的数量
int s = workers.size();
// true :说明当前线程数量时一个新高了,更新最大值
if (s > largestPoolSize)
largestPoolSize = s;
// 表示线程已经加入到线程池中了
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 说明当前添加worker添加是成功的
if (workerAdded) {
// 启动线程
t.start();
// 启动标记设置为true
workerStarted = true;
}
}
} finally {
// 如果整个条件成立,说明启动失败,需要做清理工作
if (! workerStarted)
// 1.释放令牌
// 2.当前worker 清理出workers 集合
addWorkerFailed(w);
}
// 返回新创建的线程是否启动
return workerStarted;
}