线程池是Doug lea大神写的concurrent包中应用最广泛的一个框架,通常搭配阻塞队列一起使用,就我目前所知,ThreadPoolExecutor在Spring框架,RPC远程服务框架Dubbo,分布式协调应用服务Zookeeper中进行了广泛使用,可见ThreadPoolExecutor是多么重要的一个类。作为一名合格的程序员,我们应该更深入的了解线程池。
先看一下它的流程图。
大体过程:
1. 当前线程数<核心线程数时,创建线程执行任务。
2. 当前线程数>=核心线程数时,把新的任务放入阻塞队列。
3. 当queue已满,并且最大线程数 > 核心线程数,创建线程执行任务。
4. 当queue已满,并且最大线程数>=核心线程数,默认采取拒绝策略(RejectedExecutionHandler)。
我们来详细看一下ThreadPoolExecutor的源码。
基本属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 获取线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是线程池的控制状态,是AtomicInteger类型的,里面包含两部分,workcount---线程的数量,runState---线程池的运行状态。这里限制了最大线程数是2^29-1,大约500百万个线程,这也是个问题,所以ctl也可以变成AtomicLong类型的。
线程池的五种状态:
RUNNING - 接受新任务并且继续处理阻塞队列中的任务
SHUTDOWN - 不接受新任务但是会继续处理阻塞队列中的任务
STOP - 不接受新任务,不在执行阻塞队列中的任务,中断正在执行的任务
TIDYING - 所有任务都已经完成,线程数都被回收,线程会转到TIDYING状态会继续执行钩子方法
TERMINATED - 钩子方法执行完毕
线程之间的转换:
RUNNING → SHUTDOWN
可能调用了shutdown方法,也可能隐含在finalize方法中
(RUNNING or SHUTDOWN)→ SHUTDOWN
可能调用了shutdownnow方法
SHUTDOWN -> TIDYING
队列和线程池都是空
STOP -> TIDYING
线程池为空
TIDYING -> TERMINATED
terminated()钩子方法执行完成
构造方法
看一下构造方法中的几个核心参数:
corePoolSize:线程池中的核心线程数,空闲的线程也不会回收,除非把allowCoreThreadTimeOut设置为 true,这时核心线程才会被回收。
maximumPoolSize: 线程池中可以创建的最大线程数,限定为2^29-1,大约500百万个线程。需要注意的 是,当使用无界的阻塞队列的时候,maximumPoolSize就起不到作用了。
keepAliveTime: 当线程池中创建的线程超过了核心线程数的时候,这些多余的空闲线程在结束之前等待新的 任务最大的存活时间。
unit: keepAliveTime的时间单位,可以是纳秒,微秒,毫秒,秒,分钟,小时,天。
workQueue: 存放任务的队列,只有当线程数>核心线程数,才会把其他的任务放入queue,一般常 用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue, ConcurrentLinkedQueue。
threadFactory: 创建线程的工厂类。
handler: 当queue满了和线程数达到最大限制,对于继续到达的任务采取的策略。默认采取AbortPolicy , 也就是拒绝策略。
一共四种策略:
1. AbortPolicy
/** * 默认的拒绝策略,当继续有任务到来时直接抛出异常 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
2. DiscardPolicy:rejectedexecution是个空方法,意味着直接抛弃该任务,不处理。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
3. DiscardOldestPolicy:抛弃queue中的第一个任务,再次执行该任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
4. CallerRunsPolicy: 直接由执行该方法的线程继续执行该任务,除非调用了shutdown方法,这个任务才会被丢弃,否则继续执行该任务会发生阻塞。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
实现RejectedExecutionHandler,也可以自定义实现策略方法。
execute方法
任务被执行的具体操作。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 3步操作 * * 1. 如果当前运行的线程数<核心线程数,创建一个新的线程执行任务,调用addWorker方法原子性地检查 * 运行状态和线程数,通过返回false防止不需要的时候添加线程 * 2. 如果一个任务能够成功的入队,仍然需要双重检查,因为我们添加了一个线程(有可能这个线程在上次检查后就已经死亡了) * 或者进入此方法的时候调用了shutdown,所以需要重新检查线程池的状态,如果必要的话,当停止的时候要回滚入队操作, * 或者当线程池为空的话创建一个新的线程 * 3. 如果不能入队,尝试着开启一个新的线程,如果开启失败,说明线程池已经是shutdown状态或饱和了,所以拒绝执行该任务 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
1. 检查当前线程池中的线程数是否<核心线程数,如果小于核心线程数,就调用addWorker方法创建一个新的线程执行任务,addworker中的第二个参数传入true,表示当前创建的是核心线程。如果当前线程数>=核心线程数或者创建线程失败的话,直接进入第二种情况。
2. 通过调用isRunning方法判断线程池是否还在运行,如果线程池状态不是running,那就直接退出execute方法,没有执行的必要了;如果线程池的状态是running,尝试着把任务加入到queue中,再次检查线程池的状态, 如果当前不是running,可能在入队后调用了shutdown方法,所以要在queue中移除该任务,默认采用拒绝策略直接抛出异常。如果当前线程数为0,可能把allowCoreThreadTimeOut设为了true,正好核心线程全部被回收,所以必须要创建一个空的线程,让它自己去queue中去取任务执行。
3. 如果当前线程数>核心线程数,并且入队失败,调用addWorker方法创建一个新的线程去执行任务,第二个参数是false,表示当前创建的线程不是核心线程。这种情况表示核心线程已满并且queue已满,如果当前线程数小于最大线程数,创建线程执行任务。如果当前线程数>=最大线程数,默认直接采取拒绝策略。
addWorker方法
看一下addWorker是怎么具体执行的。代码有点长,慢慢看。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
首先判断线程池的runstate,如果runstate为shutdown,那么并不能满足第二个条件,runstate != shutdown,所以这是针对runstate不是running,shutdown的情况,当runstate>shutdown时,队列为空,此时仍然有任务的话,直接返回false,线程池已关闭,并不能在继续执行任务了。
第二个自旋操作就的目的就是对线程数量自增,由于涉及到高并发,所以采用了cas来控制,判断线程的workcount>=CAPACITY,那么直接返回false,或者通过判断是否核心线程,如果是true,判断workcount>=核心线程数,如果是false,判断workcount>=最大线程数,直接返回false。如果不满足上个条件,直接使用cas把线程数自增,退出自旋操作。
接下来创建一个worker对象。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
在当前线程不为空的情况下,加一把可重入锁reentrantlock,在加锁后,再次检查线程池的状态runstate,防止在获取到锁之前线程池已经关闭了,线程池的状态为running或者状态为shutdown并且任务为空的情况下,才能继续往下执行任务,这是充分必要条件。如果当前线程已经开启了,直接抛出异常,这是绝不允许的。
把worker对象放入hashset中,hashset的底层就是hashmap实现的,hashmap是线程不安全的,所以必须要加锁。
private final HashSet<Worker> workers = new HashSet<Worker>();
接着更新线程池的线程数,把workerAdded设为true,表示新添任务成功。开启当前线程去执行任务。
addWorkerFailed方法
如果添加worker失败或者开启线程失败就要调用addWorkerFailed方法移除失败的worker。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
首先还是获取全局锁mainlock,接着对workers集合中移除worker,workers的数量自减,tryterminate方法后面再说。
runWorker方法
重点还是看worker是怎么运行的。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
首先还是获取当前线程,获取当前worker对象中的任务task,把当前线程的状态由-1设为0,表示可以获取锁执行任务,接下来就是一个while循环,当task不为空或者从gettask方法取出的任务不为空的时候,加锁,底层还是使用了AQS,保证了只有一个线程执行完毕其他线程才能执行。在执行任务之前,必须进行判断,线程池的状态如果>=STOP,必须中断当前线程,如果是running或者shutdown,当前线程不能被中断,防止线程池调用了shutdownnow方法必须中断所有的线程。
在处理任务之前,会执行beforeExecute方法, 在处理任务之后,执行afterExecute方法,这两个都是钩子方法,继承了ThreadPoolExecutor可以重写此方法,嵌入自定义的逻辑。一旦在任务运行的过程中,出现异常会直接抛出,所以在实际的业务中,应该使用try..catch,把这些日常加入到日志中。
任务执行完,就把task设为空,累加当前线程完成的任务数,unlock,继续从queue中取任务执行。
继续看一下getTask方法。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
检查线程池的状态runstate,如果runstate>=STOP或者queue为空,自减workers集合的数量,直接返回null。如果runstate的状态为running或者shutdown,获取当前线程池的数量wc,这里的判断条件有点多,wc > maximumPoolSize可能调用了setMaxmumPoolSize方法导致的,timed代表空闲的线程是否超时回收,当allowCoreThreadTimeOut设为true,核心线程也开始了倒计时,如果allowCoreThreadTimeOut设为false,表示不回收核心线程,或者wc大于核心线程数,也要开始计时。 timeout表示是否超时,这里分为两种情况,一种是阻塞地获取任务,另一种是阻塞一段时间,在该时间内能否获取到任务。
如果timed为true,从queue的keepAliveTime时间内获取任务,如果timed为false,利用take方法从queue中获取任务,当queue为空的时候,会一直阻塞,直到queue不为空,当allowCoreThreadTimeOut没有设为true的时候,线程池的核心线程将会一直阻塞在这里。如果获取到的任务不为空,直接返回该任务。如果返回的任务为空,只能是在keepAliveTime没有获取到任务,所以把timeout设为true,如果在获取任务的过程中线程被中断,把timeout设为false,不管timeout设为什么值都要进行自旋。
继续看一下判断条件。wc>maximumPoolSize,timed为true并且timeout为true(这两个条件必须是相互关联的,因为timed为true采取的是poll方法获取任务)。满足任一条件并且队列为空,利用cas把当前线程池的线程数自减,直接返回null。感觉这地方需要慢慢的斟酌,不然很慢理解。
如果这时返回null,在runWorker方法中直接退出while循环。执行processWorkerExit退出当前线程。进入这个有两种情况:第一种是queue为空,没有任务可以运行了;第二种是调用了shutdown或shutdownnow方法。
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
首先判断completedAbruptly的状态,如果completedAbruptly为true,表示在runWorker中没有执行while循环,表明queue为空,直接使用cas把当前线程数自减。注意,移除worker对象也是使用了全局锁mainLock,上锁之后,把当前线程完成的任务数累加到总的已完成任务数中,在workers中移除当前worker对象,释放锁。调用tryTerminate方法来尝试中止线程池。获取当前线程池的状态,如果处于running或者shutdown,completedAbruptly为true,调用addWorker方法,创建一个新的线程,去queue取任务继续执行。如果completedAbruptly为false,表示从queue取不到任务了,也就是说queue中任务都被执行了,此时,判断allowCoreThreadTimeOut是否开启,如果开启了allowCoreThreadTimeOut,把最先线程min设为0,否则把min设为核心线程数,如果min=0并且queue队列不为空,要把min设为1,表示至少有一个线程去处理queue中的任务,如果当前线程数>=min,直接返回。
再来看一下tryTerminate方法。
tryTerminate方法
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
里面是一个自旋方法,直到return退出为止。首先还是获取当前线程池的状态,如果处于running或者tidying或者是shutdown但是queue队列不为空,这时直接返回。如果当前线程数不为0,调用interruptIdleWorkers来关闭所有的空闲线程,直接返回。在满足前面两个条件的基础上,利用cas设置线程池的状态为tidying,调用terminate方法,这是个钩子方法,需要子类去重写,最后,把线程池的状态设为terminated,表示终止的钩子方法执行完毕。释放termination等待队列上的所有线程。
interruptIdleWorkers方法
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
这个方法用来中断所有的空闲线程。遍历workers集合,如果当前线程未中断并且能够获取到锁,只有获取到锁,说明线程本没有在工作状态,所以必须中断。但是这个方法的参数为onlyOne,如果传入true,表示只中断一个线程,Doug lea大神在源码中写道,中断一个线程可以传播shutdown信号让其他线程自行中断,这个地方真是没看明白,有点匪夷所思。如果为false,那就很简单了,直接遍历中断所有的空闲线程。
写完了线程池的运行过程,最后再来看一下线程池的中止。中止线程池主要有两种方法:shutdown和shutdownnow。
shutdown方法
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
调用了shutdown,意味着不能在继续往queue中添加任务,也不能在接受新的任务。利用cas把当前线程池的状态设为shutdown,中断所有的空闲线程,onShutdown是一个钩子方法,是专门给ScheduledThreadPoolExecutor来实现的,再次调用tryTerminate方法来尝试中止线程池,直到queue中的任务全部处理完毕才能正常关闭。
shutdownNow方法
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownnow和shutdown方法还是很不相同的,shutdownnow先把线程池的状态设为stop,接着中断所有的线程,这是一种更加严格的状态,queue中的任务也不会在继续处理,利用list来返回所有没有执行的task,queue中的任务全部被移除,线程池也是空的,这也正是stop的特点。
从上面我们可以看出线程状态之间的转换:shutdown转变为tidying,必须满足的条件就是线程数为空并且queue为空。stop转变为tidying必须满足线程数为0。running或者shutdown但queue不为空不能转化为tidying,tidying就是个中间状态,最终会变成TERMINATED。
总结
ThreadPoolExecutor是线程池框架的核心,想彻底搞清楚还是需要花费些时间,concurrent包中的很大一部分类都是为线程池服务的,可见线程池的重要地位。所以搞懂ThreadPoolExecutor的核心思想对我们以后的提升是有很大帮助的。