引言
我相信大家面试的时候都遇到过线程池有关的问题,这个问题被问到的频率很高,如果你不了解,或者仅仅是简单的了解的话,还是好好研究研究吧,因为它真的很有意思。
线程池的意义
我们有时候为了方便或者写一个测试的时候会这样启动一个线程:
new Thread(new Runnable() {
@Override
public void run() {
//处理逻辑。。。
}
}).start();
这样固然没有什么问题,但是当你有很多任务需要执行的时候,这个时候频繁的创建和销毁线程,对系统的开销和执行效率就影响很大了。如果我们使用了阿里的编码规约插件(Alibaba Java Coding Guidelines)检测你的代码的话就会提示你:不要显示创建线程,请使用线程池。如果我们创建一个线程的时间和销毁一个线程的时间之和大于执行这个任务的时间,那岂不是得不偿失,还不如直接把线程保留下来。这个时候我们就会考虑有没有一种能够将多个线程管理起来的一种方式,当然有了,就是线程池了。
线程池模型
我们了解一个事物,需要要了解它的模型,就是它的基本构成以及每一个基本构成的作用是什么,在讲解线程池模型之前,有几个相关的类我们需要关注一下:
public interface Executor {
//这是一个接口,用来执行提交的任务
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
//这是一个继承了上面接口的类,里面有多个常用的API用来提交任务和中断任务等。
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
还有一个Executors类,此类中含有多个API可以获得上面的ExecutorService实例;还有一个类就是我们即将要讲解的也是最重要的ThreadPoolExecutor,它间接实现了ExecutorService接口。
我们了解线程池模型,重点了解ThreadPoolExecutor类即可,首先我们看一下此类的构造方法:
//含有五个参数的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//含有六个参数的构造方法(多了一个ThreadFactory)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//含有六个参数的构造方法(多了一个RejectedExecutionHandler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//含有七个参数的构造方法(多了ThreadFactory和RejectedExecutionHandler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面我们对这里面的参数做一个讲解:
int corePoolSize:这个是线程池的核心线程数量。当创建线程池的时候,默认是没有创建线程的,当线程池提交任务的时候,线程池开始创建线程,如果当前线程池的线程数量没有达到核心线程的数量的话,不管核心线程数量是否属于空闲状态,都会新建核心线程用来执行线程池提交的任务,核心线程一般是不会被线程池回收的,除非你设置了allowCoreThreadTimeOut值为true的话,当核心线程空闲时间大于设置值的时候就也会被回收了。如果设置了prestartAllCoreThreads方法的话,看单词意思就会明白是提前创建核心线程在线程池中了。
int maximumPoolSize:线程池的最大线程数量,包含核心线程数量。
long keepAliveTime:这个参数是指当线程池中的非核心线程空闲时间达到此设定值的时候会被销毁回收。如果你设置了allowCoreThreadTimeOut值为true的话,则当核心线程空闲时间大于设置值的时候就也会被回收了。
TimeUnit unit:这个是上面keepAliveTime参数的单位,可以是DAYS(天) ,HOURS(小时) ,MINUTES(分) ,SECONDS(秒) ,MILLISECONDS(毫秒) ,MICROSECONDS(微秒) ,NANOSECONDS (纳秒)。
BlockingQueue workQueue:直译过来就是阻塞队列,下面讲解常用的队列
1 ArrayBlockingQueue
数组有界阻塞队列:当提交的任务数小于核心线程时候,就会新建核心线程执行任务,如果线程池达到核心线程数量,那么就将任务加入到此队列中等待,如果队列也满了,就会新建非新核心线程执行任务,如果线程池中的线程数量达到了maximumPoolSize的值,说明此刻线程池已经处于饱和状态,就会抛出异常。
2 SynchronousQueue
有界的同步阻塞队列:当有任务的时候就直接将任务交给线程处理,自己不会保留任务。如果线程都处于忙碌状态,就会新创建一个线程处理这个任务。一般使用此队列的时候会设置最大线程数量maximumPoolSize=Integer.MAX_VALUE,因为这样就能避免当前线程数量达到最大线程数量的时候不会创建线程了,因为队列不存储任务,而又不能创建线程的话岂不是要出问题?
3 LinkedBlockingQueue
链表结构的无界阻塞队列:可以设置也可以不设置队列的长度,一般不要设置。当线程数量小于核心线程的时候,新提交任务会创建线程去执行,当达到核心线程数量后,就会将任务提交到队列中等待,因为队列是无界的,总线程数量不会大于核心线程数量,所以这里maximumPoolSize的值的设定就没有意义了,一般这里将corePoolSize=maximumPoolSize。ThreadFactory threadFactory
这是一个接口,我们可以实现它的方法对其进行一些操作,比如修改更加有意义的线程名字
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
- RejectedExecutionHandler handler
这也是一个接口,就是当发生错误的时候抛出异常使用的,官方API中已经实现了几个类,其中默认的是
public static class CallerRunsPolicy implements RejectedExecutionHandler {
//这是官方默认的任务拒绝策略
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
ThreadPoolExecutor的执行策略
其实执行策略在上面讲解阻塞队列的时候说的已经比较详细了,因为执行策略其实要根据具体的队列规则进行描述,不同的队列描述是不一样的,下面做一个总结吧。
ArrayBlockingQueue 当有任务进入的时候:
1 如果线程池数量没有达到核心线程的数量,就会创建新的核心线程执行任务
2 如果线程池线程数量已经达到了核心线程数量,那么就会将任务提交到队列中等待被执行
3 如果队列也满了,将会创建非核心线程执行任务
4 如果队列满了,线程池也达到了最大线程数量,那么就会使用RejectedExecutionHandler拒绝策略。
SynchronousQueue 因为此队列不会保留任务,一般设置核心线程是0,设置最大线程数量maximumPoolSize=Integer.MAX_VALUE,这样就能避免当前线程数量达到最大线程数量的时候不会创建线程了,所以当有任务进入的时候:
1 直接创建线程执行任务
2 如果线程都处于忙碌状态,就会新创建一个线程处理这个任务
LinkedBlockingQueue 无界队列 当有任务进入的时候:
1 如果线程池数量没有达到核心线程的数量,就会创建新的核心线程执行任务
2 如果线程池线程数量已经达到了核心线程数量,那么就会将任务提交到队列中等待被执行
3 因为队列是无界队列,总线程数量不会大于核心线程数量,都被添加到队列中了。所以这里maximumPoolSize的值的设定就没有意义了,一般这里将corePoolSize=maximumPoolSize。
Executors 中常见的线程池
public class Executors {
//使用LinkedBlockingQueue作为队列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//使用SynchronousQueue作为队列
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//使用LinkedBlockingQueue作为队列,只有一个核心线程和最大线程数量
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//定时执行任务使用
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
}
上面的四个常用线程池中分别讲解:
1 newFixedThreadPool():使用的是LinkedBlockingQueue队列作为任务队列,最大线程数和核心线程数量相等,就和我们上面介绍的一致,因为队列是无界的,总线程数不会大于核心线程数量,设置最大线程数也没有意义,任务都会被加入到队列中等待执行。这样会控制并发数量,超出的线程会在队列中等待。
2 newCachedThreadPool():使用SynchronousQueue作为队列,这是有界的阻塞队列,设置核心线程为0,最大线程数量等于Interger.MAX_VALUE,那么所有进来的任务在其他线程没有空闲的时候都会新创建一个线程来执行此任务,对线程数量没有限制。
3 SingleThreadExecutor():单线程化的线程池,其实可以理解为特殊的newFixedThreadPool处理。有且仅有一个线程,按照队列的FIFO原则。
4 ScheduledThreadPool():这个支持定时周期的执行任务,比如可以实现轮播图效果。
好了,关于线程池就这么多了,完