1. 背景及优势
1.1 背景
在日常的业务(如:抢票,购物等)开发中,为了提高并发,经常会选择多线程的方式。当有多个线程在操作时,如果系统只有一个CPU,则它根本不可能真正同时进行一个以上的线程,它只能把CPU运行时间划分成若干个时间段,再将时间 段分配给各个线程执行,在一个时间段的线程代码运行时,其它线程处于挂起状。.这种方式我们称之为并发(Concurrent)。
多线程的引入虽然使得资源利用率更好、程序设计更简单、程序响应更快,但它同时也意味着底层设计更复杂、不可避免的上下文切换的开销以及先关资源消耗的增加。没有线程池的概念时,我们通常调用new Thread()创建野线程,这样的线程缺乏管理而且可以无限制创建,他们之间相互竞争,会导致过多占用系统资源导致系统瘫痪。因此线程池应运而生。
1.2 优点
核心的思想就是把宝贵的线程资源放到一个池子中,每次使用都从里面获取,用完之后又放回池子供其他人使用。因此他的优势在于重用存在的线程,减少对象创建、消亡的开销,性能佳可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞,提供定时执行、定期执行、单线程、并发数控制等功能。
2. 线程池框架 Executor
2.1 线程池的状态
在线程池类ThreadPoolExecutor类中定义了线程的状态常量(删除了部分注释):
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
有上可知ThreadPoolExecutor的ctl字段高三位保存了runState(运行状态),低29位保存了workerCount(工作线程个数)。线程池有以下五种状态:
-
RUNNING
(1)状态说明:运行状态,指可以接受任务执行队列里的任务线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。
(2)状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0! -
SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()方法时,线程池由RUNNING -> SHUTDOWN。 -
STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN ) -> STOP。 -
TIDYING
(1) 状态说明:当所有的任务已终止,任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。 -
TERMINATED
终止状态,当执行 terminated() 后会更新为这个状态。
2.2 Executor框架 介绍
Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
- Executor 接口定义了最基本的 execute 方法,用于接收用户提交任务。
- ExecutorService 定义了线程池终止shutdown及submit提交 futureTask 任务支持的方法。(submit方法提交的是Runnable或Callable,返回了一个Future可引用的实例,可以认为是FutureTask的实例)。
- AbstractExecutorService 是抽象类,提供ExecutorService 执行方法的默认实现。其内部实现的submit方法描述为:submit(Runnable) 的实现创建了一个关联RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的RunnableFuture 实现。
- ThreadPoolExecutor 是最核心的一个类,是线程池的内部实现。线程池的功能都在这里实现了,平时用的最多的基本就是这个了。其源码很精练,远没当时想象的多。
- ScheduledExecutorService接口具有为预定执行或重复执行任务而设计的方法。可以预定Runnable或Callable任务在初始的延迟之后只运行一次,也可以预定一个Runnable对象周期性的运行。
- ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上提供了支持定时调度的功能。线程任务可以在一定延时时间后才被触发执行。
- Executors 线程池工厂类。
2.3 常见的创建线程池方式
- newFixedThreadPool(int nThreads) 创建固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor() 创建单个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool() 缓存的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- newScheduledThreadPool(int corePoolSize) 支持定时及周期性的任务执行的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以从创建的源码发现,都是利用利用 ThreadPoolExecutor 类实现的,因此我们也可以通过ThreadPoolExecutor 来实现定义的线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
核心的参数介绍:
- corePoolSize 为线程池的基本大小:当有任务过来的时候才会去创建创建线程执行任务。换个说法,线程池创建之后,线程池中的线程数为0,当任务过来就会创建一个线程去执行,直到线程数达到corePoolSize 之后,就会被到达的任务放在队列中。
- maximumPoolSize 为线程池最大线程大小:当提交任务队列已经满了,但是未达到最大线程数时便会创建新线程执行任务;如果队列满了且已经达到最大任务线程数,则会执行Handler指定的拒绝(饱和策略)。
- keepAliveTime 和 unit 则是线程空闲后的存活时间和单位:即当线程数量超过核心线程且有空闲的时候,会在存活时间之后,从线程池移除,直到线程数为核心线程数。
- workQueue 用于存放任务的阻塞队列:当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。当队列满的时候,如果未达到maximumPoolSize ,则会创建新的线程执行任务。通常可以有三种类型:1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。 - threadFactory 线程工厂类;
- handler 当队列和最大线程池都满了之后的饱和(拒绝)策略。拒绝策略有以下四种:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的()任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
2.4 线程池任务处理策略源码分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();//获取当前线程池的状态
if (workerCountOf(c) < corePoolSize) {//当前线程数量小于 coreSize 时创建一个新的线程运行
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//如果当前线程处于运行状态,并且写入阻塞队列成功
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
reject(command);
else if (workerCountOf(recheck) == 0) //如果当前线程池为空就新创建一个线程并执行。
addWorker(null, false);
}
else if (!addWorker(command, false)) //如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略
reject(command);
}
总结如下:
1.如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
2. 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
3. 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过;
4. keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
2.5 线程池使用其他注意点
2.5.1 如何配置线程数量
流程聊完了再来看看上文提到了几个核心参数应该如何配置呢?有一点是肯定的,线程池肯定是不是越大越好。通常我们是需要根据这批任务执行的性质来确定的。
- IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
- CPU 密集型任务(大量复杂的运算)应当分配较少的线程,比如 CPU 个数相当的大小。
当然这些都是经验值,最好的方式还是根据实际情况测试得出最佳配置。
2.5.2 优雅的关闭线程池
有运行任务自然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。其实无非就是两个方法 shutdown()/shutdownNow()。
但他们有着重要的区别:
- shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
- shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为stop。
2.5.3 提交任务的不同方式
- execute()
Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。 - submit()
ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法。