/**
* 一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。
* 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,
* 并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。
* 每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
*
* 为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但是,
* 强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()
* (无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和
* Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
* 否则,在手动配置和调整此类时,使用以下指导:
* 核心和最大池大小
* ThreadPoolExecutor 将根据 corePoolSize(参见 getCorePoolSize())和
* maximumPoolSize(参见 getMaximumPoolSize())设置的边界自动调整池大小。
* 当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,
* 则创建新线程来处理请求,即使其他辅助线程是空闲的。
* 如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
* 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
* 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。
* 在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和
* setMaximumPoolSize(int) 进行动态更改。
* 按需构造
* 默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread() 或
* prestartAllCoreThreads() 对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。
* 创建新线程
* 使用 ThreadFactory 创建新线程。如果没有另外说明,
* 则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,
* 并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,
* 可以改变线程的名称、线程组、优先级、守护进程状态,等等。
* 如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
* 保持活动时间
* 如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止
* (参见 getKeepAliveTime(java.util.concurrent.TimeUnit))。
* 这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。
* 也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。
* 使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。
* 默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。
* 但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。
* 排队
* 所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
* 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
* 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
* 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
* 排队有三种通用策略:
* 直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,
* 如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。
* 此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
* 直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。
* 当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
* 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)
* 将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。
* (因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,
* 适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,
* 当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
* 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,
* 但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、
* 操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),
* 则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,
* 但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
* 被拒绝的任务
* 当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,
* 在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下,
* execute 方法都将调用其 RejectedExecutionHandler 的
* RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
* java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:
* 在默认的 ThreadPoolExecutor.AbortPolicy 中,
* 处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
* 在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。
* 此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
* 在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
* 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,
* 然后重试执行程序(如果再次失败,则重复此过程)。
* 定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,
* 尤其是当策略仅用于特定容量或排队策略时。
* 钩子 (hook) 方法
* 此类提供 protected 可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和
* afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。
* 它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。
* 此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
* 如果钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。
* 队列维护
* 方法 getQueue() 允许出于监控和调试目的而访问工作队列。强烈反对出于其他任何目的而使用此方法。
* remove(java.lang.Runnable) 和 purge() 这两种方法可用于在取消大量已排队任务时帮助进行存储回收。
* 终止
* 程序 AND 不再引用的池没有剩余线程会自动 shutdown。如果希望确保回收取消引用的池(即使用户忘记调用 shutdown()),
* 则必须安排未使用的线程最终终止:设置适当保持活动时间,
* 使用 0 核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
* 扩展示例。此类的大多数扩展可以重写一个或多个受保护的钩子 (hook) 方法。例如,下面是一个添加了简单的暂停/恢复功能的子类:
* class PausableThreadPoolExecutor extends ThreadPoolExecutor {
* private boolean isPaused;
* private ReentrantLock pauseLock = new ReentrantLock();
* private Condition unpaused = pauseLock.newCondition();
* public PausableThreadPoolExecutor(...) { super(...); }
* protected void beforeExecute(Thread t, Runnable r) {
* super.beforeExecute(t, r);
* pauseLock.lock();
* try {
* while (isPaused) unpaused.await();
* } catch(InterruptedException ie) {
* t.interrupt();
* } finally {
* pauseLock.unlock();
* }
* }
* public void pause() {
* pauseLock.lock();
* try {
* isPaused = true;
* } finally {
* pauseLock.unlock();
* }
* }
* public void resume() {
* pauseLock.lock();
* try {
* isPaused = false;
* unpaused.signalAll();
* } finally {
* pauseLock.unlock();
* }
* }
* }
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 主池控制状态ctl是一个原子整数,它包装了两个概念字段workerCount,指示线程的有效数量;
* runState,指示是否运行,关闭等
*
* 为了将它们打包为一个int,我们将workerCount限制为(2^29)-1(约5亿个)线程,
* 而不是(2^31)-1(20亿个)可表示的线程。如果将来有问题,可以将该变量更改为AtomicLong,
* 并在以下调整shift / mask常数。但是在需要之前,使用int可以使此代码更快,更简单。
*
* workerCount是已被允许启动但不允许停止的工人数(workers)。该值可能与活动线程的实际数量暂时不同,
* 例如,当ThreadFactory在被询问时未能创建线程,并且退出线程仍在终止之前执行簿记操作时,
* 该值会有所不同。用户可见池大小报告为工作集(workers set)的当前大小。
*
* runState提供主要的生命周期控制,并具有以下值:
* 运行:接受新任务并处理排队的任务
* 关机:不接受新任务,但处理排队的任务
* 停止:不接受新任务,不处理排队的任务以及中断进行中的任务
* TIDYING:所有任务已终止,workerCount为零,
* 转换为TIDYING状态的线程将运行terminated()钩子方法
* 终止:terminated()已完成
*
* 这些值之间的数字顺序很重要,可以进行有序的比较。runState随时间单调增加,但不必达到每个状态。
* 过渡是:
* RUNNING -> SHUTDOWN 正在运行->关闭
* 在调用shutdown()时,可能隐式在finalize()中
* (RUNNING or SHUTDOWN) -> STOP (正在运行或关机)->停止
* 调用shutdownNow()时
* SHUTDOWN -> TIDYING 关机->整理
* 当队列和池都为空时
* STOP -> TIDYING 停止->整理
* 当池为空时
* TIDYING -> TERMINATED 整理->已终止
* 当Terminate()挂钩方法完成时
*
* 状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
*
* 检测从SHUTDOWN到TIDYING的转换并不像您想要的那样简单,因为在SHUTDOWN状态期间队列在非空之后可能会变空,
* 反之亦然,但是只有在看到它为空之后,我们看到workerCount 为0(有时需要重新检查-参见下文)。
*/
// ctl高3位代表当前的线程池状态,低29位代表当前的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState存储在高位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* 不需要解压缩ctl的位字段访问器。这些取决于位布局和workerCount永远不会为负。
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 尝试CAS递增ctl的workerCount字段。
* @param expect
* @return
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 尝试CAS递减ctl的workerCount字段。
* @param expect
* @return
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 减少ctl的workerCount字段。 仅在线程突然终止时调用此方法(请参阅processWorkerExit)。
* 其他减量在getTask中执行。
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* 用于保留任务并移交给工作线程的队列。
* 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),
* 因此仅依靠isEmpty来查看队列是否为空(例如,在确定是否从SHUTDOWN过渡到TIDYING时必须这样做)。
* 这可容纳特殊用途的队列,例如DelayQueues,允许poll()返回null,
* 即使它在延迟到期后稍后可能返回non-null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 锁定时要锁定工人位置(workers set)和相关簿记。虽然我们可以使用某种并发集,但事实证明,通常最好使用锁。
* 其中一个原因是,这会序列化interruptIdleWorkers,从而避免了不必要的中断风暴,尤其是在关机期间。
* 否则,退出线程将同时中断那些尚未中断的线程。这也简化了与largestPoolSize等相关的一些统计簿记。
* 我们还将mainLock保持在shutdown和shutdownNow上,以确保工作集稳定,
* 同时分别检查中断许可和实际中断许可。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 集包含池中的所有工作线程。 仅在按住mainLock时访问。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 等待条件以支持awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪达到的最大池大小。 仅在mainLock下访问。
*/
private int largestPoolSize;
/**
* 计数器完成的任务。 仅在终止工作线程时更新。 仅在mainLock下访问。
*/
private long completedTaskCount;
/*
* 所有用户控制参数都声明为volatile,以便正在进行的操作基于最新值,但无需锁定,
* 因为没有内部不变性依赖于它们相对于其他操作的同步变化。
*/
/**
* 新线程的工厂。使用此工厂(通过方法addWorker)创建所有线程。
* 必须为所有调用程序做好准备,以使addWorker失败,这可能反映出系统或用户的策略限制了线程数。
* 即使未将其视为错误,创建线程的失败也可能导致新任务被拒绝或现有任务仍停留在队列中。
*
* 我们走得更远,即使遇到诸如OutOfMemoryError之类的错误,它们也会保留池不变式,
* 而在尝试创建线程时可能会抛出该错误。由于需要在Thread.start中分配本机堆栈,因此此类错误相当常见,
* 并且用户将需要执行清理池关闭来进行清理。可能有足够的可用内存来完成清除代码,
* 而不会遇到另一个OutOfMemoryError。
*/
private volatile ThreadFactory threadFactory;
/**
* 当执行饱和或关闭时调用处理程序。
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲线程等待工作的超时时间(以纳秒为单位)。
* 当存在多个corePoolSize或allowCoreThreadTimeOut时,线程将使用此超时。
* 否则,他们将永远等待新的工作。
*/
private volatile long keepAliveTime;
/**
* 如果为false(默认),则即使处于空闲状态,核心线程也保持活动状态。
* 如果为true,则核心线程使用keepAliveTime来超时等待工作。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 除非设置allowCoreThreadTimeOut,
* 否则核心池大小是保持活动状态(不允许超时等)的最小工作线程数,在这种情况下,最小值为零。
*/
private volatile int corePoolSize;
/**
* 最大池大小。 请注意,实际最大值在内部受“容量”限制。
*/
private volatile int maximumPoolSize;
/**
* 默认拒绝执行处理程序
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* 调用shutdown和shutdownNow所需的权限。我们还要求(请参阅checkShutdownAccess),
* 调用者必须具有实际中断工作程序集中线程的权限(由Thread.interrupt所控制,
* 该线程依赖于ThreadGroup.checkAccess,
* 而ThreadGroup.checkAccess则依赖于SecurityManager.checkAccess)。
* 仅在这些检查通过的情况下,才尝试关机。
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
* 执行终结器时要使用的上下文,或者为null。
*/
private final AccessControlContext acc;
/**
* Class Worker主要维护线程运行任务的中断控制状态,以及其他次要簿记。
* 此类适时地扩展了AbstractQueuedSynchronizer以简化获取和释放围绕每个任务执行的锁。
* 这样可以防止旨在唤醒等待任务的工作线程而不是中断正在运行的任务的中断。
* 我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock,
* 因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。
* 此外,为了抑制直到线程真正开始运行任务之前的中断,我们将锁定状态初始化为负值,
* 并在启动时将其清除(在runWorker中)。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/*
* state = -1 表示初始化状态
* state = 0 释放锁
* state = 1 表示获得锁状态
*/
/**
* 此类永远不会序列化,但是我们提供了serialVersionUID来禁止javac警告。
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 此工作程序正在其中运行的线程。如果工厂失败,则为null。 */
final Thread thread;
/** 要运行的初始任务。 可能为null。 */
Runnable firstTask;
/** 每个线程任务计数器 */
volatile long completedTasks;
/**
* 使用给定的第一个任务和线程工厂中的线程创建。
* @param firstTask 第一项任务(如果没有则为null)
*/
Worker(Runnable firstTask) {
setState(-1); // 禁止中断,直到runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 将主运行循环委托给外部runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// 值0代表解锁状态。
// 值1表示锁定状态。
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
/*
* 设置控制状态的方法
*/
/**
* 将runState转换为给定目标,如果至少已经有给定目标,则将其保留。
* @param targetState 所需的状态,即SHUTDOWN或STOP
* (但不是TIDYING或TERMINATED-为此使用tryTerminate)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* 如果(关闭和池和队列为空)或(停止和池为空),则转换为TERMINATED状态。
* 如果可以终止,但workerCount非零,则中断一个空闲的worker,以确保关闭信号传播。
* 必须在可能终止的任何操作之后调用此方法-减少工作人员(worker)计数或在关闭期间从队列中删除任务。
* 该方法是非私有的,以允许从ScheduledThreadPoolExecutor访问。
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 状态是否是RUNNING 或者 是否为(TIDYING、TERMINATED)
// 或者(为SHUTDOWN并且队列非空)
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 有资格终止
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 否则重试失败的CAS
}
}
/*
* 控制工作线程中断的方法。
*/
/**
* 如果有安全管理器,请确保调用者通常具有关闭线程的权限(请参阅shutdownPerm)。
* 如果通过,则另外确保允许调用方中断每个工作线程。
* 即使Security Manager特别对待某些线程,即使通过了首个检查,也可能不正确。
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
* 中断所有线程,即使处于活动状态也是如此。
* 忽略SecurityExceptions(在这种情况下,某些线程可能保持不间断)。
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
* 中断可能正在等待任务的线程(如未锁定所示),以便它们可以检查终止或配置更改。
* 忽略SecurityExceptions(在这种情况下,某些线程可能保持不间断)。
* @param onlyOne 如果为true,则最多中断一名工人(worker)。
* 仅当启用终止功能但还有其他工作程序时,才从tryTerminate调用此方法。
* 在这种情况下,在所有线程当前正在等待的情况下,最多只有一个等待的工作程序被中断以传播关闭信号。
* 中断任意线程可确保自关闭开始以来新到达的工作程序最终也将退出。为了保证最终终止,
* 总是只中断一个空闲的工作程序就足够了,但是shutdown()会中断所有空闲的工作程序,
* 以便多余的工作程序迅速退出,而不必等待散乱的任务完成。
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* interruptIdleWorkers的常见的形式,以避免必须记住boolean变量的手段。
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/*
* 杂项实用程序(Misc utilities),其中大多数还导出到ScheduledThreadPoolExecutor
*/
/**
* 调用给定命令的拒绝执行处理程序。
* 受Package保护,供ScheduledThreadPoolExecutor使用。
* @param command
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* 在调用shutdown进行运行状态转换之后执行任何进一步的清理。
* 这里没有操作,但由ScheduledThreadPoolExecutor使用它来取消延迟的任务。
*/
void onShutdown() {
}
/**
* ScheduledThreadPoolExecutor需要进行状态检查以在关机期间启用正在运行的任务。
* @param shutdownOK 如果关闭则应返回true
* @return
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
* 通常使用drainTo将任务队列排放到新列表中。但是,
* 如果队列是DelayQueue或其他类型的队列,但poll或drainTo可能无法删除某些元素,
* 则将它们逐个删除。
* @return
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList); // 移除q队列中所有可用的元素,并将它们添加到taskList 中。
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
/*
* 工作人员(workers)创建,运行和清理的方法
*/
/**
* 检查是否可以针对当前池状态和给定的边界(核心或最大值)添加新的工作程序。
* 如果是这样,将相应地调整工作程序计数,并且如果可能,将创建并启动一个新的工作程序,
* 并运行firstTask作为其第一个任务。如果池已停止或有资格关闭,则此方法返回false。
* 如果在询问时线程工厂无法创建线程,则还返回false。
* 如果线程创建失败(由于线程工厂返回null或由于异常
* (通常是Thread.start()中的OutOfMemoryError)),我们将进行干净的回滚。
* @param firstTask 新线程应首先运行的任务(如果没有,则为null)。
* 当线程数少于corePoolSize线程(在这种情况下,我们总是启动一个线程),
* 或者队列已满(在这种情况下,我们必须绕过队列),
* 使用初始的第一个任务(在execute()方法中)创建工作程序以绕过队列。 最初,
* 空闲线程通常是通过prestartCoreThread创建的,或者用于替换其他垂死的工作线程。
* @param core 如果为true,请使用corePoolSize作为绑定,否则使用maximumPoolSize。
* (此处使用布尔值指示符而不是值,以确保在检查其他池状态后读取新值)。
* @return 成功则为true
*/
private boolean addWorker(Runnable firstTask, boolean core) {
/*
* 1、添加工作线程数(cas添加工作线程数量)
*/
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重读ctl
if (runStateOf(c) != rs)
continue retry;
// 否则CAS由于workerCount更改而失败; 重试内循环
}
}
/*
* 2、创建工作线程
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 按住锁时重新检查。
// 如果ThreadFactory失败或在获得锁之前关闭,请回退。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 预检查t是否可启动
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 回滚工作线程创建。
* -从工人(worker)中删除工人(worker)(如果有)
* -减少工人(worker)人数
* -如果该工人(worker)的存在阻止了解雇,请重新检查是否解雇
* @param w
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount(); // 减少ctl的workerCount字段。
tryTerminate(); // 如果(关闭和池和队列为空)或(停止和池为空),则转换为TERMINATED状态。
} finally {
mainLock.unlock();
}
}
/**
* 为垂死的工人进行清理和簿记。仅从辅助线程调用。除非设置了completedAbruptly,
* 否则假设已经对workerCount进行了调整以解决退出问题。如果由于用户任务异常而退出线程,
* 或者正在运行的线程少于corePoolSize或队列为非空但没有工作者,
* 则此方法从工作者集(worker set)中删除线程,并可能终止该池或替换该工作者(workers)。
* @param w 工人(worker)
* @param completedAbruptly 如果工人由于用户异常而死亡
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果是突然的,则不调整workerCount
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 如果(关闭和池和队列为空)或(停止和池为空),则转换为TERMINATED状态。
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 无需更换
}
addWorker(null, false);
}
}
/**
* 根据当前配置设置执行阻塞或定时等待任务,或者如果此工作程序由于以下任何原因而必须退出,则返回null:
* 1.超过了maximumPoolSize工人(worker)(由于调用setMaximumPoolSize)。
* 2.池已停止。
* 3.池已关闭,队列为空。
* 4.该工作程序超时等待任务,并且在定时等待之前和之后都将终止超时工作程序(即,
* allowCoreThreadTimeOut || workerCount> corePoolSize),
* 并且如果队列为非空,则此工作程序将终止 不是池中的最后一个线程。
* @return task务,如果工作人员必须退出,则返回null,
* 在这种情况下,workerCount递减
*/
private Runnable getTask() {
/*
* 1、首先要去队列中拿数据
* 2、非核心线程数被回收
*/
boolean timedOut = false; // 最后的poll()是否超时?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 工人(workers )会被淘汰吗?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 获取并移除workQueue的头部,在keepAliveTime前等待可用的元素(如果有必要)。
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 获取并移除workQueue的头部,在元素变得可用之前一直等待(如果有必要)。
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
* 主工作者运行循环。 反复从队列中获取任务并执行任务,同时应对许多问题:
* 1.我们可以从一个初始任务开始,在这种情况下,我们不需要获得第一个任务。
* 否则,只要池正在运行,我们就会从getTask获取任务。如果返回null,
* 则工作程序会由于更改的池状态或配置参数而退出。其他退出是由于外部代码中的异常引发而导致的,
* 在这种情况下,completedAbruptly成立,这通常导致processWorkerExit替换此线程。
*
* 2.在运行任何任务之前,先获取锁,以防止任务执行时其他池中断,然后确保除非池正在停止,
* 否则此线程不会设置其中断。
*
* 3.每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,
* 我们将导致线程死掉(中断,带有completelyAbruptly true的循环),而不处理该任务。
*
* 4.假设beforeExecute正常完成,我们运行任务,收集其引发的任何异常以发送给afterExecute。
* 我们分别处理RuntimeException,Error(规范保证我们可以捕获它们)和任意Throwables。
* 因为我们无法在Throwables.run中抛出Throwable,
* 所以我们将它们包装在Errors中(输出到线程的UncaughtExceptionHandler)。
* 任何抛出的异常也会保守地导致线程死亡。
*
* 5. task.run完成后,我们调用afterExecute,这也可能引发异常,这也将导致线程死亡。
* 根据JLS Sec 14.20,此异常是即使task.run抛出也会生效的异常。
*
* 异常机制的最终结果是afterExecute和线程的UncaughtExceptionHandler
* 具有与我们所能提供的有关用户代码遇到的任何问题的准确信息。
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果池正在停止,请确保线程被中断; 如果没有,请确保线程不被中断。
// 这需要在第二种情况下重新检查以处理shutdownNow竞赛,同时清除中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在执行给定线程中的给定 Runnable 之前调用的方法。
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 基于完成执行给定 Runnable 所调用的方法。
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 为垂死的工人(Worker)进行清理和簿记。
processWorkerExit(w, completedAbruptly);
}
}
// 公共构造函数和方法
/**
* 用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
* 使用 Executors 工厂方法之一比使用此通用构造方法方便得多。
* @param corePoolSize 池中所保存的线程数,包括空闲线程。
* @param maximumPoolSize 池中允许的最大线程数。
* @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
* @param unit keepAliveTime 参数的时间单位。
* @param workQueue 执行前用于保持任务的队列。
* 此队列仅保持由 execute 方法提交的 Runnable 任务。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
* @param corePoolSize 池中所保存的线程数,包括空闲线程。
* @param maximumPoolSize 池中允许的最大线程数。
* @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
* @param unit keepAliveTime 参数的时间单位。
* @param workQueue 执行前用于保持任务的队列。
* 此队列仅保持由 execute 方法提交的 Runnable 任务。
* @param threadFactory 执行程序创建新线程时使用的工厂。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
* @param corePoolSize 池中所保存的线程数,包括空闲线程。
* @param maximumPoolSize 池中允许的最大线程数。
* @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
* @param unit keepAliveTime 参数的时间单位。
* @param workQueue 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
* @param handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 用给定的初始参数创建新的 ThreadPoolExecutor。
* @param corePoolSize 池中所保存的线程数,包括空闲线程。
* @param maximumPoolSize 池中允许的最大线程数。
* @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
* @param unit keepAliveTime 参数的时间单位。
* @param workQueue 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
* @param threadFactory 执行程序创建新线程时使用的工厂。
* @param handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,
// 或者 corePoolSize 大于 maximumPoolSize。
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 如果 workQueue、threadFactory 或 handler 为 null。
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* 在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,
* 或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。
* @param command 要执行的任务。
*/
public void execute(Runnable command) {
if (command == null) // 如果命令为 null
throw new NullPointerException();
/*
* 进行3个步骤:
*
* 1. 如果正在运行的线程少于corePoolSize线程,请尝试使用给定命令作为其第一个任务来启动新线程。
* 对addWorker的调用从原子上检查runState和workerCount,
* 从而通过返回false来防止在不应该添加线程的情况下发出错误警报。
*
* 2. 如果一个任务可以成功地排队,那么我们仍然需要仔细检查是否应该添加一个线程
* (因为现有线程自上次检查后就死掉了)或该池自进入此方法后就关闭了。因此,我们重新检查状态,
* 并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。
*
* 3. 如果我们无法将任务排队,则尝试添加一个新线程。如果失败,我们知道我们已关闭或已饱和,
* 因此拒绝该任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 是否为RUNNING状态并且将command插入workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 调用给定命令的拒绝执行处理程序。
else if (!addWorker(command, false))
reject(command);
}
/**
* 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。
* 如果已经关闭,则调用没有其他作用。
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限
checkShutdownAccess();
// 将runState转换为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断可能正在等待任务的线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 转换为TERMINATED状态。
tryTerminate();
}
/**
* 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。
* 在从此方法返回的任务队列中排空(移除)这些任务。 并不保证能够停止正在处理的活动执行任务,
* 但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,
* 所以无法响应中断的任何任务可能永远无法终止。
* @return 从未开始执行的任务的列表。
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查权限
checkShutdownAccess();
// 将runState转换为STOP
advanceRunState(STOP);
// 中断可能正在等待任务的线程
interruptWorkers();
// 使用drainTo将任务队列排放到新列表中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 转换为TERMINATED状态。
tryTerminate();
return tasks;
}
/**
* 如果此执行程序已关闭,则返回 true。
* 覆写:ExecutorService的isShutdown()方法
* @return 如果该执行程序已关闭,则为true
*/
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
/**
* 如果此执行程序处于在 shutdown 或 shutdownNow 之后正在终止但尚未完全终止的过程中,
* 则返回 true。此方法可能对调试很有用。关闭之后很长一段时间才报告返回的 true,
* 这可能表示提交的任务已经被忽略或取消中断,导致此执行程序无法正确终止。
* @return 如果正在终止但尚未完成,则返回 true
*/
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
/**
* 如果关闭后所有任务都已完成,则返回 true。注意,
* 除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。
* 覆写:ExecutorService的isTerminated()方法
* @return 如果关闭后所有任务都已完成,则返回 true
*/
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
/**
* 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,
* 直到所有任务完成执行。
* 覆写:ExecutorService的awaitTermination()方法
* @param timeout 最长等待时间
* @param unit timeout 参数的时间单位
* @return 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false
* @throws InterruptedException 如果等待时发生中断
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* 当不再引用此执行程序时,调用 shutdown。
* 覆写:Object的finalize()方法
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
/**
* 设置用于创建新线程的线程工厂。
* @param threadFactory 新线程工厂
*/
public void setThreadFactory(ThreadFactory threadFactory) {
// 如果 threadFactory 为 null
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* 返回用于创建新线程的线程工厂。
* @return 当前线程工厂
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* 设置用于未执行任务的新处理程序。
* @param handler 新处理程序
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null) // 如果处理程序为 null
throw new NullPointerException();
this.handler = handler;
}
/**
* 返回用于未执行任务的当前处理程序。
* @return 当前处理程序
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* 设置核心线程数。此操作将重写构造方法中设置的任何值。如果新值小于当前值,
* 则多余的现有线程将在下一次空闲时终止。如果较大,则在需要时启动新线程来执行这些排队的任务。
* @param corePoolSize 新核心大小
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) // 如果 corePoolSize 小于 0
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// 我们真的不知道“需要”多少个新线程。
// 作为试探法,请预先启动足够的新工作线程(最大为新的核心大小)以处理队列中的当前任务数,
// 但是如果在此过程中队列变空则停止。
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
/**
* 返回核心线程数。
* @return 核心线程数
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* 启动核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,
* 此操作才重写默认的启动核心线程策略。如果已启动所有核心线程,此方法将返回 false。
* @return 如果启动了线程,则返回 true
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* 与prestartCoreThread相同,
* 除了安排即使corePoolSize为0至少启动一个线程。
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
/**
* 启动所有核心线程,使其处于等待工作的空闲状态。
* 仅当执行新任务时,此操作才重写默认的启动核心线程策略。
* @return 已启动的线程数
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
/**
* 如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,
* 新任务到达时正在替换(如果需要),则返回 true。当返回 true 时,
* 适用于非核心线程的相同的保持活动策略也同样适用于核心线程。
* 当返回 false(默认值)时,由于没有传入任务,核心线程不会终止。
* @return 如果允许核心线程超时,则返回 true;否则返回 false
*/
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
/**
* 如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。
* 当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,
* 适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,
* 保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。
* @param value 如果应该超时,则为 true;否则为 false
*/
public void allowCoreThreadTimeOut(boolean value) {
// 如果 value 为 true 并且当前保持活动时间不大于 0。
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
/**
* 设置允许的最大线程数。此操作将重写构造方法中设置的任何值。如果新值小于当前值,则多余的现有线程将在下一次空闲时终止。
* @param maximumPoolSize 新的最大值
*/
public void setMaximumPoolSize(int maximumPoolSize) {
// 如果新的最大值小于等于 0,或者小于核心池大小
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
/**
* 返回允许的最大线程数。
* @return 允许的最大线程数
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* 设置线程在终止前可以保持空闲的时间限制。如果池中的当前线程数多于核心线程数,
* 在不处理任务的情况下等待这一时间段之后,多余的线程将被终止。此操作将重写构造方法中设置的任何值。
* @param time 等待的时间。时间值 0 将导致执行任务后多余的线程立即终止。
* @param unit 时间参数的时间单位
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
// 如果时间小于 0
if (time < 0)
throw new IllegalArgumentException();
// 时间为 0 和 allowsCoreThreadTimeOut
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
/**
* 返回线程保持活动的时间,该时间就是超过核心池大小的线程可以在终止前保持空闲的时间值。
* @param unit 所需的结果时间单位
* @return 时间限制
*/
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* User-level queue utilities */
/**
* 返回此执行程序使用的任务队列。对任务队列的访问主要用于调试和监控。
* 此队列可能正处于活动使用状态中。获取任务队列不妨碍已加入队列的任务的执行。
* @return 任务队列
*/
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
/**
* 从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则其不再运行。
* 此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。
* 例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。
* 但是,在此情况下,purge() 方法可用于移除那些已被取消的 Future。
* @param task 要移除的任务
* @return 如果已经移除任务,则返回 true
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // 如果SHUTDOWN且现在为空
return removed;
}
/**
* 尝试从工作队列移除所有已取消的 Future 任务。此方法可用作存储回收操作,它对功能没有任何影响。
* 取消的任务不会再次执行,但是它们可能在工作队列中累积,直到 worker 线程主动将其移除。
* 调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// 如果在遍历期间遇到干扰,请走慢的路。
// 复制以进行遍历,并调用remove删除已取消的条目。
// 慢速路径更有可能是O(N*N)。
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // 如果SHUTDOWN且现在为空
}
/* 统计 */
/**
* 返回池中的当前线程数。
* @return 线程数。
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 消除isTerminated()&& getPoolSize()> 0的罕见可能性
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* 返回主动执行任务的近似线程数。
* @return 线程数。
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* 返回曾经同时位于池中的最大线程数。
* @return 线程数。
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* 返回曾计划执行的近似任务总数。因为在计算期间任务和线程的状态可能动态改变,
* 所以返回值只是一个近似值。
* @return 任务数
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* 返回已完成执行的近似任务总数。因为在计算期间任务和线程的状态可能动态改变,
* 所以返回值只是一个近似值,但是该值在整个连续调用过程中不会减少。
* @return 任务数。
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
/**
* 返回标识此池及其状态的字符串,
* 包括运行状态的指示以及估计的工作人员(worker)和任务计数。
* @return 标识此池及其状态的字符串
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
/* Extension hooks */
/**
* 在执行给定线程中的给定 Runnable 之前调用的方法。此方法由将执行任务 r 的线程 t 调用,
* 并且可用于重新初始化 ThreadLocals 或者执行日志记录。此实现不执行任何操作,
* 但可在子类中定制。注:为了正确嵌套多个重写操作,此方法结束时,
* 子类通常应该调用 super.beforeExecute。
* @param t 将运行任务 r 的线程。
* @param r 将执行的任务。
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* 基于完成执行给定 Runnable 所调用的方法。此方法由执行任务的线程调用。如果非 null,
* 则 Throwable 是导致执行突然终止的未捕获 RuntimeException 或 Error。
* 注:当操作显示地或者通过 submit 之类的方法包含在任务内时(如 FutureTask),
* 这些任务对象捕获和维护计算异常,因此它们不会导致突然终止,内部异常不会 传递给此方法。
*
* 此实现不执行任何操作,但可在子类中定制。注:为了正确嵌套多个重写操作,此方法开始时,
* 子类通常应该调用 super.afterExecute。
* @param r 已经完成的 runnable 线程。
* @param t 导致终止的异常;如果执行正常结束,则为 null。
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* 当 Executor 已经终止时调用的方法。默认实现不执行任何操作。注:为了正确嵌套多个重写操作,
* 子类通常应该在此方法中调用 super.afterExecute。
*/
protected void terminated() { }
/**
* 用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;
* 如果执行程序已关闭,则会丢弃该任务。
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* 创建一个 CallerRunsPolicy。
*/
public CallerRunsPolicy() { }
/**
* 执行调用者线程中的任务 r;如果执行程序已关闭,则会丢弃该任务。
*
* @param r 请求执行的可运行任务。
* @param e 试图执行此任务的执行程序。
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* 用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* 创建一个 AbortPolicy。
*/
public AbortPolicy() { }
/**
* 总是抛出 RejectedExecutionException。
*
* @param r 请求执行的可运行任务。
* @param e 试图执行此任务的执行程序
* @throws RejectedExecutionException 总是抛出此异常。
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* 用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* 创建一个 DiscardPolicy。
*/
public DiscardPolicy() { }
/**
* 不执行任何操作,在这种情况下将放弃任务 r。
*
* @param r 请求执行的可运行任务
* @param e 试图执行此任务的执行程序
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* 用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* 为给定执行程序创建 DiscardOldestPolicy。
*/
public DiscardOldestPolicy() { }
/**
* 获取并忽略下一个任务,否则如果该任务立即可用,执行程序将执行该任务,
* 然后再试图重新执行任务 r;如果执行程序已关闭,则会丢弃任务 r。
*
* @param r 请求执行的可运行任务。
* @param e 试图执行此任务的执行程序。
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
ThreadPoolExecutor源码分析。
猜你喜欢
转载自blog.csdn.net/en_joker/article/details/105488405
今日推荐
周排行