本节学习Executors类源码
首先看一下常用的线程池的几种创建方式
/** * 创建一个线程池,该线程池重用固定数量的线程,在共享的无界队列中运行 */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
/** * 创建一个线程池,该线程池只有一个工作者线程运行在无界队列中 */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
/** * 创建一个线程池,如果需要就创建线程, 但是会重用之前已创建的可用线程 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
/** * 创建一个可定时执行任务的线程池 */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
下面看一下ThreadPoolExecutor类
有几个核心的参数:
/** * 阻塞队列,保存任务并交给工作者线程 */ private final BlockingQueue<Runnable> workQueue; /** * 重入锁 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 工作者线程集合 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * 跟踪最大线程池大小 */ private int largestPoolSize; /** * 已执行完成的线程个数 */ private long completedTaskCount; /** * 创建线程的工厂 */ private volatile ThreadFactory threadFactory; /** * 饱和策略执行者 */ private volatile RejectedExecutionHandler handler; /** * 空闲工作线程的空闲时间;超过corePoolSize的线程或者allowCoreThreadTimeOut为true的主线程使用这个作为超时时间; 否则线程一直等待任务或关闭 */ private volatile long keepAliveTime; /** * 如果为false,核心线程会一直存活 * 如果为true,核心线程使用keepAliveTime等待空闲工作者线程超时 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程个数 */ private volatile int corePoolSize; /** * 最大线程个数 */ private volatile int maximumPoolSize; /** * 默认的拒绝策略 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/** * 构造函数,创建线程池 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException();//非法校验 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime);//毫秒 this.threadFactory = threadFactory; this.handler = handler; }
下面看一下线程池执行任务的过程:
/** * 执行任务 */ public void execute(Runnable command) { if (command == null)//如果任务为空,抛异常 throw new NullPointerException(); /* * 1. 如果工作者线程个数少于corePoolSize,添加一个工作者线程 * 2. 否则,如果线程池处于运行状态则尝试插入工作队列中,如果插入成功,再次检查线程池状态,如果非运行,则队列中移除任务,且拒绝该任务,如果工作者线程为0,则添加工作者线程 * 3. 如果不能插入工作队列中,则尝试创建新的线程,如果失败,则拒绝任务。 */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();//重新检查,如果线程数超过最大,则拒绝任务,从队列中移除任务 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
/** * 创建新的线程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//获取工作线程个数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))//如果超过最大或者核心线程数,则返回false return false; if (compareAndIncrementWorkerCount(c))//线程个数+1 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//创建新的线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//获取锁 try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//添加到工作集合中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;//设置最大线程数 workerAdded = true; } } finally { mainLock.unlock();//释放锁 } if (workerAdded) { t.start();//执行任务 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//任务启动失败,则从任务队列中移除该任务,ctl减一 } return workerStarted; }
/** *拒绝任务 */ final void reject(Runnable command) { handler.rejectedExecution(command, this); }
默认使用的是AbortPolicy,直接抛出RejectedExecutionException异常
还有三种策略:
1.CallerRunsPolicy:如果线程池非中断状态,则直接在调用拒绝的线程上执行该任务,否则丢弃任务。
2.DiscardOldestPolicy:如果线程池在非中断状态,则从工作队列中移除最近的任务,且执行新的任务,否则丢弃任务
3. DiscardPolicy : 丢弃任务,什么也不做。
下面看一下任务的执行
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {//如果task为空,则从队列中获取 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//任务执行 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);//从workers移除 } }