我们使用线程池一般是按照下边的规则进行的
private static ThreadPoolExecutor executors = new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
public void ConsumerHandler(List<Bean> beans) {
executors.submit(new Runnable() {
@Override
public void run() {
/**
* 要执行的业务方法
**/
service.handler(beans);
}
});
}
/**
* 调用的方法
**/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
先对这几个参数进行一下说明总结:
1. corePoolSize:线程池的基本大小。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其它空闲的线程,直到线程数达到corePoolSize时就不再创建,这时会把提交的新任务放到阻塞队列。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
2. maximumPoolSize:线程池允许创建的最大线程数。如果阻塞队列满了,并且已经创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
3. keepAliveTime:线程活动保持时间。指工作线程空闲后,继续保持存活的时间。默认情况下,这个参数只有在线程数大于corePoolSize时才起作用。所以,如果任务很多,且每个任务的执行时间比较短,可以调大keepAliveTime,提高线程的利用率。
4.时间单位
5. workQueue:用来保存等待执行的任务的阻塞队列。
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO原则对元素进行排序。
- LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序元素,吞吐量通常要高于ArrayBlockingQuene。
- SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene。
- priorityBlockingQuene:具有优先级的无界阻塞队列。
6. rejectedExecutionHandler:饱和策略。当阻塞队列满了且没有空闲的工作线程,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略在默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。不过,线程池提供了4种策略:
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在的线程来运行任务。
- DiscardOldestPolicy:丢弃阻塞队列中最近的一个任务,并执行当前任务。
- DiscardPolicy:直接丢弃。
上边只限于用法,现在看一下线程池实现框架中的各个类之间的关系,图来自
从图中可以看到Executor、ExecutorService、ScheduledExecutorService是接口,ThreadPoolExecutor和ScheduledThreadPoolExecutor是线程池实现,前者是一个普通的线程池,后者一个定期调度的线程池,Executors是辅助工具,用以帮助我们定义线程池.
Exectors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,主要提供了以下几种便捷的方式:
1.newCachedThreadPool
2.newFixedThreadPool
3.newSingleThreadExecutor
4.newScheduledThreadPool
/**
* 工作线程数量几乎没有上线,因为maximumPoolSize为Integer.MAX_VALUE(2147483647)。
* 如果长时间没有提交任务,且工作线程空闲了指定的时间,则该工作线程将自动终止。如果重新提交了任务,则线程池重新创建一个工作线程。
* 它在没有任务执行时,会释放工作线程,从而释放工作线程所占用的资源。但是,但当提交新任务时,又要创建新的工作线程,有一定的系统开销。
* 另外一定要注意控制任务的数量,否则由于大量线程同时运行,很有会造成系统瘫痪。
* 另外这里没有制定线程池的处理策略,所以默认是 RejectedExecutionHandler defaultHandler =new AbortPolicy(); 直接丢弃了,下边没有指定的时候也是这样
* 要慎重使用
**/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* newFixedThreadPool:创建一个指定工作线程数的线程池,其中参数corePoolSize和maximumPoolSize相等,阻塞队列基于LinkedBlockingQuene。
* 它是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。
* 但是在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
**/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 创建一个可以在指定时间内周期性的执行任务的线程池。在实际业务中常用的场景是周期性的同步数据。
**/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* 创建一个可以在指定时间内周期性的执行任务的线程池。在实际业务中常用的场景是周期性的同步数据。
**/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Exectors是一个工厂类,通过上边这些方法创建对应的线程池,主要有两个大类:ThreadPoolExecutor和ScheduledThreadPoolExecutor,通过上边的图也可以得到相关的结论,而上边这四个方法就是工厂方法;通过这种方式创建会导致一个问题就是隐藏底层的实现细节,比如默认的丢弃策略,这就非常不友好,所以我们还是采取一开始的方式去使用。下边我们主要看一下ThreadPoolExecutor的实现,先看一下里边最重要的变量ctl:
private static final int COUNT_BITS = Integer.SIZE - 3; // 29 位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池最大容量
// runState is stored in the high-order bits
// 定义的线程池状态常量
// 111+29个0,值为 -4 + 2 + 1 = -1
private static final int RUNNING = -1 << COUNT_BITS;
// 000+29个0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001+29个0
private static final int STOP = 1 << COUNT_BITS;
// 010+29个0
private static final int TIDYING = 2 << COUNT_BITS;
// 011+29个0
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到线程池状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 得到线程池线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 反向构造 ctl 的值
ThreadPoolExecutor
内部作者采用了一个 32bit
的 int
值来表示线程池的运行状态(runState)
和当前线程池中的线程数目(workerCount)
,这个变量取名叫 ctl
(control
的缩写),其中高 3bit
表示允许状态,低 29bit
表示线程数目(最多允许 2^29 - 1
个线程)。
1.COUNT_BITS,表示用于标记线程数量的位数,32-3=29位
2.CAPACITY, 表示线程池最大可以容纳的线程数量,2^29-1
3.RUNNING,表示运行状态,-1 << COUNT_BITS,前三位的值为111,后29位为0
4.SHUTDOWN,表示不接受新的任务,但是可以处理阻塞队列里的任务。0<< COUNT_BITS,前三位的值为000,后29位为0。调用shutdown()方法会置为该状态。
5.STOP,该状态不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。1<< COUNT_BITS,前三位的值为001,后29位为0。调用shutdownNow()方法会置为该状态
6.TIDYING,表示过渡状态,2<< COUNT_BITS,前三位的值为010,后29位为0。此时表示所有的任务都执行完了,当前线程池已经没有有效的线程,并且将要调用terminated方法
7.TERMINATED,表示终止状态,3<< COUNT_BITS,前三位的值为011,后29位为0
8.runStateOf(int c) ,获取线程池状态,这里c为ctl变量,CAPACITY取反结果是前三位为1,后29位为0,与ctl与操作即可得到状态
9.workerCountOf(int c), 与runStateOf(int c) 相反取后29位,即线程数量
10.ctlOf(int rs, int wc),基于状态和线程数量构造一个ctl变量
对于状态可以简单理解为:RUNNING为-1,SHUTDOWN为0,STOP为1,TIDYING为2,TERMINATED为3。RUNNING变为SHUTDOWN或者STOP后,再变为TIDYING,再变为TERMINATED。ThreadPoolExecutor的关键步骤见下图:
上图总结了 ThreadPoolExecutor 源码中的关键性步骤,正好对应我们此次解析的核心源码(上图出处见水印)。
1.execute 方法用来向线程池提交 task,这是用户使用线程池的第一步。如果线程池内未达到 corePoolSize 则新建一个线程,将该 task 设置为这个线程的 firstTask,然后加入 workerSet 等待调度,这步需要获取全局锁 mainLock
2.已达到 corePoolSize 后,将 task 放入阻塞队列
3.若阻塞队列放不下,则新建新的线程来处理,这一步也需要获取全局锁 mainLock
4.当前线程池 workerCount 超出 maxPoolSize 后用 rejectHandler 来处理
我们可以看到,线程池的设计使得在 2 步骤时避免了使用全局锁,只需要塞进队列返回等待异步调度就可以,仅剩下 1 和 3 创建线程时需要获取全局锁,这有利于线程池的效率提升,因为一个线程池总是大部分时间在步骤 2 上,否则这线程池也没什么存在的意义。为了帮助我们理解,这里列出来其他的一些变量:
// 阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 存放工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 终止条件
private final Condition termination = mainLock.newCondition();
// 最大线程池容量
private int largestPoolSize;
// 已完成任务数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器
private volatile RejectedExecutionHandler handler;
// 线程等待运行时间
private volatile long keepAliveTime;
// 是否运行核心线程超时
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;
// 默认拒绝执行处理器
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
//
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
...
AbstractExecutorService提供了最常用的三个添加任务到线程成的方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到最终它们都是调用了execute
方法,ThreadPoolExecutor中execute的实现如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* 如果当前线程数小于 corePoolSize
**/
if (workerCountOf(c) < corePoolSize) {
/**
* 调用 addWorker 方法新建线程,如果新建成功返回 true,那么 execute 方法结束
* 添加进去的线程在这里执行
**/
if (addWorker(command, true))
return;
/**
* 这里意味着 addWorker 失败,向下执行,因为 addWorker 可能改变 ctl 的值,
* 所以这里重新获取下 ctl
**/
c = ctl.get();
}
/**
* 到这步要么是 corePoolSize 满了,要么是 addWorker 失败了
* 前者很好理解,后者为什么会失败呢?addWorker 中会讲
* 如果线程池状态为 RUNNING 且 task 插入 Queue 成功
**/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/**
* 如果已不处于 RUNNING 状态,那么删除已经入队的 task,然后执行拒绝策略
* 这里主要是担心并发场景下有别的线程改变了线程池状态,所以 double-check 下
**/
if (! isRunning(recheck) && remove(command))
reject(command);
/**
* 这个分支有点难以理解,意为如果当前 workerCount=0 的话就创建一个线程
* 那为什么方法开头的那个 addWorker(command, true) 会返回 false 呢,其实
* 这里有个场景就是 newCachedThreadPool,corePoolSize=0,maxPoolSize=MAX 的场景,
* 就会进到这个分支,以 maxPoolSize 为界创建临时线程,firstTask=null
**/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 这个分支很好理解,workQueue 满了那么要根据 maxPoolSize 创建线程了
* 如果没法创建说明 maxPoolSize 满了,执行拒绝策略
**/
else if (!addWorker(command, false))
reject(command);
}
在看下AddWorker:
/**
* core 表示以 corePoolSize 还是 maxPoolSize 为界
**/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 看看 addWorker 什么时候返回 false
* 这里的 if 逻辑有点难懂,用下数学上的分配率,将第一个逻辑表达式放进括号里就好懂了
* 1、rs >= SHUTDOWN && rs != SHUTDOWN 其实就表示当线程池状态是 STOP、TIDYING, 或 TERMINATED 的时候,当然不能添加 worker 了,任务都不执行了还想加 worker?
* 2、rs >= SHUTDOWN && firstTask != null 表示当提交一个非空任务,但线程池状态已经不是 RUNNING 的时候,当然也不能 addWorker,因为你最多只能执行完 Queue 中已有的任务
* 3、rs >= SHUTDOWN && workQueue.isEmpty() 如果 Queue 已经空了,那么不允许新增
* 需要注意的是,如果 rs=SHUTDOWN && firstTask=null 或者 rs=SHUTDOWN && workQueue 非空的情况下,还是可以新增 worker 的,需要创建临时线程处理 Queue 里的任务
**/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/**
* 这里也是一个返回 false 的情况,但很简单,就是数目溢出了
**/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 未超过限制则尝试把线程数加1,成功跳出retry循环
**/
if (compareAndIncrementWorkerCount(c))
break retry;
/**
* CAS 失败的话,check 下目前线程池状态,如果发生改变就回到外层 loop 再来一遍,这个也好理解,否则单纯 CAS 失败但是线程池状态不变的话,就只要继续内层 loop 就行了
**/
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
/**
* 这是全局锁,必须持有才能进行 addWorker 操作
**/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动worker代理的线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
方法中retry: for (;;) {...}
的内容主要是用于判断是否线程池已经关闭,以及线程数量是否超过限制。若未关闭,未超过限制则把线程数加1。firstTask为null的时候, w.thread不为null,所以firstTask是否在addWorker中还是没有区别,那只能更进一步看看worker里对firstTask是如何处理的。看下worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
...
}
从上面可以看出,工作线程Worker继承了同步器AQS,对同步器AQS不了解的可以看看深入浅出java同步器,同时还实现Runable接口,为什么要这么设计?因为在线程池创建工作线程worker成功后,直接调用work.start()方法启动该线程(即在worker实例中初始化的线程),并在runWorker方法中传递了自身实例,接下去让我们看看线程池的核心方法runWorker:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts,创建worker时状态设置为-1了,此时设置为1
boolean completedAbruptly = true; //task是否意外终止,意外终止为true,反之false
try {
//优先运行初始化时的firstTask, 如果firstTask已经执行了则从队列取
while (task != null || (task = getTask()) != null) {
w.lock(); //获取到task后锁定,独占worker,保证线程安全
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //空方法,用于子类扩展
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//空方法,用于子类扩展
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//移除执行完成的worker
processWorkerExit(w, completedAbruptly);
}
}
从阻塞队列中获取任务。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//无限循环,确保操作成功
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//当线程池处于STOP及以上状态时,线程数减一,该线程不使用。
//当线程处于SHUTDOWN 状态时,并且workQueue请求队列为空,释放该线程。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取当前线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//如果调用allowCoreThreadTimeOut方法设置为true,则所有线程都有超时时间。
//如果当前线程数大于核心线程数则该线程有超时时间。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//worker数量大于maximumPoolSize
if ((wc > maximumPoolSize || (timed && timedOut))
//workerCount大于1或者worker阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个wc)
&& (wc > 1 || workQueue.isEmpty())) {
// 比较并减少workerCount
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//poll方法从阻塞队列中取出对象,如果队列为空,则当前线程阻塞keepAliveTime时间再尝试取出,还是没有就返回null,记录超时状态,在重新进入for循环时才试图终结Worker。
//take()方法没有超时时间,会一直获取。也就是说在这里不断获取任务,
//也就是如果线程池处于RUNNING、SHUTDOWN状态时,只要等待队列不为空,那么线程就会一直执行。这也就是线程重用的原理。
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
此函数用于从workerQueue阻塞队列中获取Runnable对象,由于是阻塞队列,所以支持有限时间等待(poll)和无限时间等待(take)。在该函数中还会响应shutDown和、shutDownNow函数的操作,若检测到线程池处于SHUTDOWN或STOP状态,则会返回null,而不再返回阻塞队列中的Runnalbe对象。
runWorker方法的finally语句块中有一个processWorkerExit方法,processWorkerExit方法是在worker退出时调用到的方法,而引起worker退出的主要因素如下:
1.阻塞队列已经为空,即没有任务可以运行了。
2.调用了shutDown或shutDownNow函数
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果线程被中断,则需要减少workCount
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//将worker完成的任务数添加到总的完成任务中
completedTaskCount += w.completedTasks;
//从workers中移除worker
workers.remove(w);
} finally {
//释放锁
mainLock.unlock();
}
//这个方法是尝试终止的方法下面会介绍
tryTerminate();
//获取线程池控制状态
int c = ctl.get();
//小于STOP的运行状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//添加worker
addWorker(null, false);
}
}
此函数会根据是否中断了空闲线程来确定是否减少workerCount的值,并且将worker从workers集合中移除并且会尝试终止线程池。
shutdown是把线程池状态转为SHUTDOWN,这时等待队列中的任务可以继续执行,但是不会接受新任务了,通过中断方式停止空闲的(根据没有获取锁来确定)线程。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查shutdown权限
checkShutdownAccess();
//把线程池的状态置为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲线程
interruptIdleWorkers();
//调用shutdown方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
tryTerminate
该方法是一个终止线程池的方法。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果处于这三种情况不需要关闭线程池
// 1. Running 状态
// 2. SHUTDOWN 状态并且任务队列不为空,不能终止
// 3. TIDYING 或者 TERMINATE 状态,说明已经在关闭了 不需要重复关闭
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 进入到关闭线程池的代码,如果线程池中还有线程,则需要打断线程
if (workerCountOf(c) != 0) { // Eligible to terminate 可以关闭池子
// 打断闲置线程,只打断一个
interruptIdleWorkers(ONLY_ONE);
return;
// 如果有两个以上怎么办?只打断一个?
// 这里只打断一个是因为 worker 回收的时候都会进入到该方法中来,可以回去再看看
// runWorker方法最后的代码
}
// 线程已经回收完毕,准备关闭线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();// 加锁
try {
// 将状态改变为 TIDYING 并且即将调用 terminated
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 终止线程池
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 改变状态
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
// 如果终止失败会重试
}
// else retry on failed CAS
}
}
总结:HashSet<Worker>类型的workers和BlockingQueue<Runnable>类型的workQueue,分别存放coresize对应的工作线程和maxsize对应的阻塞线程,在execute这个方法中判断心来的线程是直接创建线程执行,还是加入到workQueue中去,这块的逻辑可以看上边的代码;在addWorker方法中创建一个新的Worker加入到workers中去,调用Worker中的runWorker方法启动线程,在这里核心判断是task != null || (task = getTask()) != null,首先看workers中的task是不是有值,没有的话再取getTask(workQueue)队列中的值,如果当前调用都为空的话执行processWorkerExit,总数减掉一个worker,否则的话在创建一个空的worker保持总量worker的总量不变,这就能实现core是复用的效果了。ThreadPoolExecutor是Java中线程池的实现类,Executors工具类中有多个它的工厂方法(newFixedThreadPool等),本身池的思想是享元模式的应用,这里实际上JDK通过ThreadPoolExecutor使得客户端代码和Thread解耦,我们不需要再直接控制Thread的了(ThreadPoolExecutor可以通过设置ThreadFactory定制Thread)。另外这里使用的设计模式有:
1、工厂方法-Exectors
2、模版方法-ThreadPoolExecutor.Worker
3、Master-worker方式--ThreadPoolExecutor.Worker
3、策略-ThreadPoolExecutor中的四种拒绝策略
4、命令-ThreadPoolExecutor.execute
ThreadPoolExecutor:调用者,它持有了一个命令队列,客户端代码可以向它提交要执行的任务(命令);
BlockingQueue:任务阻塞队列,它实际上就是模式中的“命令队列”;
Runnable:任务抽象,也就是“命令”;
ConcreteRunnable:具体的任务类,它持有一个Receiver;
Receiver:被调用者,也就是具体做事情的人,不过在具体的应用时,常常会将Runnable实现类直接实现具体的逻辑和这个角色有重合,不过设计模式重点在于结合场景并不是教条,不用纠结于此。
参考
http://www.linkedkeeper.com/138.html
http://blogxin.cn/2016/11/27/JUC-ThreadPoolExecutor-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/
https://feiybox.com/2018/11/18/ThreadPoolExecutor源码分析/
http://fengwanxingzhou.coding.me/线程池ThreadPoolExecutor源码分析/
https://www.cnblogs.com/liuyun1995/p/9305273.html
http://penghb.com/2017/11/02/java/threadPoolExecutor/
https://www.cayun.me/java/ThreadPoolExecutor/
http://mingxinglai.com/cn/2016/05/java-executor/#top
https://extremej.itscoder.com/threadpoolexecutor_source/
https://www.cnblogs.com/chrischennx/p/9600156.html
https://chenjiayang.me/2019/02/04/threadPoolExecutor/