文章目录
1.任务拒绝策略:RejectedExecutionHandler
线程池提供了四种继承RejectedExecutionHandler的拒绝策略,我们可以分别来看一下:
1.默认拒绝策略:AbortPolicy
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
//可以看到这边的拒绝的方式是抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2.不抛错拒绝策略:DiscardPolicy
这个拒绝策略,不会执行报错,是silent版的AbortPolicy策略
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
3.移除队头元素:DiscardOldestPolicy
这个拒绝策略会把阻塞队列中等待的最后一个任务移出,然后把新任务添加进去(可能失败可能成功)
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
//先把队头的元素移出,然后尝试入队
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
4.通过主线程直接执行:CallerRunsPolicy
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
//线程池未关闭,直接执行没有入队的线程
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
5.设置任务拒绝策略
这边可以根据业务需求自定义任务拒绝策略。
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
2.线程池状态说明
这边保存了五个线程池的状态,了解线程池状态才能更好理解线程池
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
状态 | 解释 |
---|---|
RUNNING | 运行态,可处理新任务并执行队列中的任务 |
SHUTDOW | 关闭态,不接受新任务,但处理队列中的任务 |
STOP | 停止态,不接受新任务,不处理队列中任务,且打断运行中任务 |
TIDYING | 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法 |
TERMINATED | 结束态,terminated() 方法已完成 |
3.BlokingQueue:队列策略
1.LinkedBlockingQueue
2.SynchronousQueue
4.ThreadPoolExecutor实现
1.execute
execute本身有一个处理过程,流程如下:
1.当线程池的工作集数量小于corepoolsize的时候,尝试添加到工作集,如果成功,直接返回
2.如果添加失败(可能是这个过程中其他线程也并发添加,导致数量大于corepoosize了),这个时候需要重新获取一下工作集的数量
3.判断线程池是RUNNING状态,如果是,添加任务到阻塞队列,添加成功之后还要去判断一下线程池是否是RUNNIING状态(可能在这段时间内被shutdown了)
4.如果是RUNNIONG状态,线程池工作集为0,添加一个空的工作集是为了让他处理阻塞队列中的任务,如果不是RUNNING状态通过拒绝策略拒绝进来的任务.
5.如果第一次判断线程池状态不是RUNNING(步骤3),尝试添加到工作集,这个时候大小就是maximumPoolSize了,如果添加成功相安无事,如果添加失败通过拒绝策略处理
public void execute(Runnable command) {
//如果线程为空,报错
if (command == null) {
throw new NullPointerException();
}
//拿到RUNNING的线程数量
int c = ctl.get();
//有效线程数小于【核心池线程数】
if (workerCountOf(c) < corePoolSize) {
//添加任务成功返回
if (addWorker(command, true)) {
return;
}
//如果添加到工作线程队列失败,需要拿到RUNNING的线程数量继续往下走
//失败的原因可能是 1.线程池在添加到工作集的过程中被SHUTDOWN了 2.线程池数量超过corePoolSize,因为并发高导致
c = ctl.get();
}
//前提:有效线程数> corePoolSize ,线程池是running状态,提交到等待队列(成功)
if (isRunning(c) && workQueue.offer(command)) {
//再次获取一下运行状态
int recheck = ctl.get();
//如果不是RUNNING状态,尝试从等待队列中移除成功
//(SynchronousQueue因为是没有容量的,所以remove永远返回false)、LinkedBlockingQueue除非节点为空或者找不到对应节点才会返回false
if (!isRunning(recheck) && remove(command)) {
reject(command);
}
//这边是不满足上面的if的条件就是:isRunning(recheck) || !remove(command)才会进入,判断worker数量是否为0
//如果是running的状态,回创建一个空的worker去执行,如果不是running的状态队列为空会返回false,不会创建worker
else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
//如果线程池不是running状态且进入阻塞队列失败,尝试开启maximumPoolSize(这边的第二个参数false),添加到工作集
//如果失败就启动拒绝策略
else if (!addWorker(command, false)) {
reject(command);
}
}
1.execute流程图
2.addWorker
addWoker主要分成三个部分:
1.根据线程池状态和工作集数量判断是否添加任务
2.添加任务创建新的worker的激活
3.异常要把worker移除工作集的处理
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
//拿到RUNNING的线程数量
int c = ctl.get();
//获取线程池运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果是shutdown之后的状态,不是shutdown直接返回false,不支持新增
//否则是shutdown,但是传进来的Runnable为空并且队列不为空,要继续往下执行.说明还有任务没结束
//--- 这个地方失败可能是线程池被SHUTDOWN了
//变式 : rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
//变式2: rs >= SHUTDOWN && rs != SHUTDOWN || rs >= SHUTDOWN && firstTask != null || rs >= SHUTDOWN && workQueue.isEmpty())
//变式3: rs > SHUTDOWN || rs == SHUTDOWN && firstTask != null || rs == SHUTDOWN && workQueue.isEmpty())
//所以可以这样表述:如果线程池是 SHUTDOWN以上的状态(直接返回false不允许添加了) 如果线程池是SHUTDOWN状态,当添加的RUNNABLE不为空或者工作队列为空了不允许添加
//说明 当线程池SHUTDOWN了,不能添加不为null的任务,当阻塞队列为空了,也不能继续添加,直接返回false
//其他情况可以往下
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
for (; ; ) {
//获取运行中的worker数量
int wc = workerCountOf(c);
//如果达到上限,这边的core决定是用corePoolSize 还是 maximumPoolSize来判断大小
// -- 这个地方失败的有可能是因为高并发导致线程数量超过限制
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
//没有达到上限往这边走,cas操作使工作线程数量加1,跳出循环
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
// Re-read ctl
//上述的CAS尝试失败,说明数量被修改了,所以这边要重新获取一次
c = ctl.get();
//如果线程池运行状态和之前获取的不一致,返回到retry,重新进入for循环往后面走
if (runStateOf(c) != rs) {
continue retry;
}
// else CAS failed due to workerCount change; retry inner loop
}
}
//新线程启动标识
boolean workerStarted = false;
//新线程添加到workers标识
boolean workerAdded = false;
Worker w = null;
try {
//创建一个新的Worker,因为worker也是一个Runnable,在构造函数中,会把worker赋值给线程工厂,生成一个Thread(Runnable = this.worker)的线程,所以下面的start相当于启动这个worker.
w = new Worker(firstTask);
final Thread t = w.thread;
//Runnable不为空
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//工作集的锁进行锁定
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//重新检查线程池的运行状态,可能在获取锁的过程中线程池被关闭
int rs = runStateOf(ctl.get());
//RUNNING状态或者SHUTDOWN状态并且任务为null.
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// precheck that t is startable
//判断线程是否启动过了,启动过了则报错,因为需要交给 工作集来处理
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
//添加到工作集
workers.add(w);
//判断修改最大线程池的大小
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
//添加成功设置为true
workerAdded = true;
}
} finally {
//最终解锁
mainLock.unlock();
}
//如果worker添加成功
if (workerAdded) {
//worker对应的线程进行启动
t.start();
workerStarted = true;
}
}
} finally {
//这个参数是false只有一种情况就是当Runnable==null的时候
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
1.是否添加任务
先来看第一部分,是否添加任务,上面的代码有两个for循环去处理,我们可以看到除了两处return false
和一处break retry
,其他的地方都是一直在尝试,所以我们其实可以这么理解,有两种情况是尝试添加失败了,有一种情况是添加成功,继续往下执行worker的处理
那么看一下第一处的失败 ,上面我们把这个表达式分解得到的结果是:
如果线程池是 SHUTDOWN以上的状态(直接返回false不允许添加了) 如果线程池是SHUTDOWN状态,当添加的RUNNABLE不为空或者工作队列为空了不允许添加
第二次的失败比较简单,超过线程池的corepoolsize或者maximumpoolsize大小的时候不能添加
成功往下走的情况是当cas添加worker集数量+1,成功的时候跳出循环。
2.添加新的worker
这边的话,会先去创建一个新的worker,Thread是从线程工厂中获取,传入的Runnable实现是worker本身,因为worker就是一个Runnable的实现类,所以可以直接扔到Thread里面去创建
这边同样会重新去判断一下线程池的状态,只有在RUNNING的时候能添加非空的任务,在SHUTDOWN的时候可以添加空任务。
当队列添加成功的时候,才会去启动对应的线程,如果添加失败,在finally的时候会处理添加worker失败的时间。这里的失败可能来自于两个地方:
1.线程本来就是激活的,抛出错误
2.不满足Running或者是shutdown状态但是添加的是空任务
只有这两种情况,才视为失败,会在finally的时候处理添加失败的操作.
Worker(Runnable firstTask) {
// inhibit interrupts until runWorker
//-1表示该worker还没有运行
setState(-1);
//设置线程任务
this.firstTask = firstTask;
//设置线程
this.thread = getThreadFactory().newThread(this);
}
3.处理添加worker失败
处理worker失败,这边会把他移出工作集,并且尝试终止线程池tryTerminate
,只是尝试,因为出错的地方有两个,有可能是线程池状态被改变了,所以需要去尝试是否可以终止,加速终止过程
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//移除worker
if (w != null) {
workers.remove(w);
}
//减少数量
decrementWorkerCount();
//尝试终止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
4.尝试终止线程池
这边有几种情况是不需要终止的(第一个return):
1.线程池还是running状态
2.线程池已经被终止掉了,状态变成TIDYING或者TERMINATED
3.线程池状态是SHUTDOWN,但是阻塞队列不为空,等待任务执行结束再进行终止.
如果不满足上面的条件则继续往下:
此时是shutdown状态,如果工作集的数量不为0,会调用interruptIdleWorkers
方法,这个方法我们下面会讲到,这个方法传true,会尝试去中断一个等待任务的worker线程。
继续往下,如果是SHUTDOWN状态,并且阻塞队列已经清空了,先要设置线程池状态,这边提供了一个terminated
模板给子类实现。
final void tryTerminate() {
for (; ; ) {
//拿到RUNNING的线程数量
int c = ctl.get();
//1.线程还在运行 2.atleast状态是TIDYING或者TERMINATED,已经终止了 3.SHUTDOWN但是队列不为空,等待队列结束 返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
return;
}
// Eligible to terminate
//SHUTDOWN状态,如果此时线程池还有线程、中断唤醒一个正在等任务的空闲worker 返回
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
//如果状态是SHUTDOWN,workQueue也为空了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//提供模板方法给子类实现
terminated();
} finally {
//设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用了 等待线程池终止的线程 awaitTermination()
termination.signalAll();
}
return;
}
} finally {
//最终解锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
3.addWorker流程图
4.线程执行Worker.run
worker的run是直接调用runWorker
方法的,这边也可以分成几个步骤去理解
1.任务的获取,这边说的任务的获取包括两个方面,一个是worker本身的Runnable不为空,优先执行自己,否则要去阻塞队列里面去去获取,注意这边不是if的判断,而是一个while循环,也就是说如果Worker的Runnable执行完了,会再去队列中拿任务,直到队列里面也没有任务了.
2.任务执行
3.worker执行结束的处理
final void runWorker(Worker w) {
//拿到当前执行线程
Thread wt = Thread.currentThread();
//拿到任务,Runnable
Runnable task = w.firstTask;
w.firstTask = null;
// allow interrupts
//new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0, 而interruptIfStarted()中只有state>=0才允许调用中断
w.unlock();
boolean completedAbruptly = true;
try {
//任务不为空 或者 从阻塞队列中getTask()不为null
while (task != null || (task = getTask()) != null) {
// 锁定
w.lock();
//线程池状态大于等于STOP、或者【任务线程】有中断标识 并且当前worker线程没有中断 ,需要对当前的worker线程进行中断
//后面可以看到shutdownNow会对所有的任务进行中断状态的设置,这边的处理就是对应的shutdownNow的处理
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
wt.interrupt();
}
try {
//模板方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行Runnable
task.run();
} catch ( )
{ ...........//省略
}
} finally {
//模板方法
afterExecute(task, thrown);
}
} finally {
task = null;
//worker的完成任务数量加一,此时是线程安全的 ,释放锁
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//每个工作线程进行处理最后都有一个处理退出的操作,主要是做一些资源的重建和计数的操作
processWorkerExit(w, completedAbruptly);
}
}
1.从队列中获取任务
这边也是一个死循环,可以看到,有两个地方return null
说明阻塞队列不返回任务了,有一个地方是返回了非空的任务,还有一个地方进行了continue
操作。我们分别来看一下:
第一个return null
,是当线程池状态大于等于STOP,或者是线程池状态等于SHUTDOWN,但是阻塞队列里面没有任务了。这边其实可以这样理解,如果是调用shutdownNow方法,会使得线程状态一下子变成STOP,那么这边阻塞队列里面的任务就不能再参与运行了,因为后面会返回队列中没有处理的这部分任务。如果是shutdown的情况,会修改为shutdown状态,阻塞队列中的任务依然会继续消耗掉。所以这边的判断条件设置成这样.
第二个return null;
当满足了上述条件:
1.工作集数量大于maximumPoolSize
2.获取超时并且阻塞队列为空
通过CAS去减少工作集数量,如果成功直接返回null,如果失败继续循环
return
非空任务的情况先从阻塞队列中获取任务,如果任务为空则设置超时,进行下一轮的循环,否则直接返回任务.
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
//线程运行状态 和 RUNNING数量
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 线程池状态为shutdown,且workQueue为空
// 线程池状态大于stop(shutdownNow()会导致变成STOP)
//workcount-1,返回null
//变式: rs >= SHUTDOWN && workQueue.isEmpty() || rs >= STOP
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取一下工作集数量
int wc = workerCountOf(c);
// Are workers subject to culling?
//设置超时时间为true 或者 工作集数量 > corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//工作集数量超过 maximumPoolSize 或者 (上一次获取超时 )并且 (工作集大于1 或者 等待队列为空)
//这边虽然表达式比较复杂,但是可以理解为:超时返回null、大于maximumPoolSize返回null,等待队列为空返回null
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//尝试将线程数减一并返回null
if (compareAndDecrementWorkerCount(c)) {
return null;
}
//尝试失败,继续循环
continue;
}
try {
//计时则在规定时间出队列,非计时通过take获取。
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
//不为空,返回Runnable,是从等待队列中返回的
if (r != null) {
return r;
}
//如果失败设置超时
timedOut = true;
} catch (InterruptedException retry) {
//被中断的话,设置非超时,被中断的
timedOut = false;
}
}
}
2.执行任务
这边在执行任务之前有一个步骤是:线程池状态大于等于STOP、或者【任务线程】有中断标识 并且当前worker线程没有中断 ,需要对当前的worker线程进行中断。后面可以看到shutdownNow会对所有的任务进行中断状态的设置,这边的处理就是对应的shutdownNow的处理
在任务执行前后提供了两个模板方法。
worker退出的处理
worker的退出这边有一个入参:completedAbruptly,如果while循环进去了,处理过最少一次任务,这个入参就是false,如果while循环没有进去的话,这个入参就是true,可以把它理解为是否突然退出。
如果是突然退出的话,worker数量需要进行-1操作,突然退出说明没有任务了或者是第一次处理任务的时候就出现异常了,所以保底起见先把工作集数量-1,如果是false,就不要执行这一步操作
再往下,这个worker退出,则需要移出工作集,真残忍.
调用tryTerminate
方法,包含两种情况,可能是线程池shutdown了,也可能是短期内确实没有任务了,参考上面描述的该方法。
再往下判断如果是RUNNING或者SHUTDOWN状态。如果不是突然退出,下面有一步操作,因为可能包含两种状态
如果是RUNNING的时候,工作集的数量是会增加的,这边判断的是否需要去处理阻塞队列中的任务(工作集中的线程数量满足处理的数量要求,这只是一次worker的退出,所以如果添加也只会添加一个),如果不需要直接返回。如果需要往下添加一个空任务的worker。
如果是SHUTDOWN的状态,一样的道理,如果阻塞队列没有任务,不需要添加空任务的worker,等待运行中的worker结束就可以了,如果阻塞队列不为空,并且线程池真正运行的数量不够,则需要添加一个,来加快SHUTDOWN的进程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If abrupt, then workerCount wasn't adjusted
//completedAbruptly为true时代表发生了异常,线程数减一
if (completedAbruptly) {
decrementWorkerCount();
}
//锁住,从工作集中移除
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成任务数
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//根据当前线程状态判断是否需要结束线程池,尝试
tryTerminate();
int c = ctl.get();
//如果是RUNNING 和SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
//正常没发生异常
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty()) {
min = 1;
}
//不需要添加线程的工作集的情况
if (workerCountOf(c) >= min) {
// not needed
return;
}
}
addWorker(null, false);
}
}
5.Worker.run流程图
2.shutdown
shutdown方法不会影响正在执行的线程,他会等待正在执行的线程执行结束,然后终止,这边的执行步骤分别为:
1.判断是否有权限进行shutdown操作
2.死循环设置线程池的状态为shutdown
3.对还在【等待执行】的worker List 设置中断标识(不打扰在执行中的线程)
4.ScheduledThreadPoolExecutor的钩子,这边没有用到
5.尝试终止线程池:worker集清理工作等,加快终止
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//锁定
mainLock.lock();
try {
//判断是否有权限终止
checkShutdownAccess();
//死循环设置为SHUTDOWN状态
advanceRunState(SHUTDOWN);
//对还在【等待执行】的worker List 设置中断标识(不打扰在执行中的线程),在AQS的处理中 如果有中断标识会自己中断
interruptIdleWorkers();
//ScheduledThreadPoolExecutor的钩子?后面看了再说吧
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//解锁
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
我们来看一下他怎么做到不打扰正在运行中的worker的
因为worker本身继承了AQS,所以具备state,如果不是运行中的,state等于0.可以尝试设置成功,tryLock成功的前提是worker还没进入执行,还在等待.
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//不是中断状态并且获取锁成功(这个worker可能正在等待执行),因为worker本身继承了AQS,所以具备state,如果不是运行中的,state等于0.可以尝试设置成功
if (!t.isInterrupted() && w.tryLock()) {
try {
//设置中断状态
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne){
break;
}
}
} finally {
mainLock.unlock();
}
}
3.shutdownNow
shutdownNow比较过分,直接设置线程池状态为Stop,即空任务的worker(用于处理阻塞队列的任务)都不能添加。
同样他也需要检查权限
设置Stop状态
对执行中的线程进行设置中断
因为shutdownNow不支持对阻塞队列中的任务进行处理了,所以直接弹出阻塞队列的任务,然后返回这些任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//锁定
mainLock.lock();
try {
//校验SHUTDOWN权限
checkShutdownAccess();
//强行设置为STOP状态直到成功
advanceRunState(STOP);
//对正在执行的worker设置中断标识
interruptWorkers();
//弹出阻塞队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止
tryTerminate();
//返回阻塞队列中的任务
return tasks;
}
对执行中的线程设置中断标识:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers){
//对正在执行的任务设置 中断状态
w.interruptIfStarted();
}
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
弹出阻塞队列中的任务,并且这些在等待的任务需要返回,因为还没执行
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}