持续创作,加速成长!这是我参与「掘金日新计划 · 6 月更文挑战」的第4天,点击查看活动详情
基于JDK1.8详细介绍了ThreadPoolExecutor线程池的基本属性和构造器!
1 ThreadPoolExecutor的概述
public class ThreadPoolExecutor extends AbstractExecutorService
JDK线程池的关键实现,我们常用的Executors中的预定义线程池就有这个类的实例,当然也可以通过该类的构造器来创建一个自定义的线程池,提供任务执行,线程调度,线程池管理等等服务,理解了这个类,其他线程池的实现原理就很好理解了。
CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy是四个拒绝策略类。对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,任务会交给RejectedExecutionHandler来处理。
Worker是对线程池中工作线程的包装,用于对于传递的任务或者任务队列中的任务进行执行、中断控制等操作。这其中还涉及到AQS,即Worker实现了简单的不可重入锁。
2 ThreadPoolExecutor的主要属性
使用一个ctl原子变量来来同时记录线程池的运行状态(runState,简称rs)和线程池中线程数量(workerCount,简称wc)。int类型转换为二进制之后的最高三位保存线程池的状态,低29位保存线程数量。刚初始化ctl的时候,rs为RUNNING状态,wc为0。
另外还具有一个ReentrantLock独占锁,当改变线程池状态,比如添加工作线程、停止线程池,或者访问线程池共享参数信息比如当前线程数量的时候,因为这涉及到多个工作线程之间的共享信息比如线程池状态、工作线程数量等参数的同步,需要获取mainLock独占锁才能进行操作。
其他的属性将会在后面的构造器部分一一详解。
/**
* 用来同时记录线程池的运行状态(runState,简称rs)和线程池中线程数量(workerCount,简称wc)。ctl的值使用AtomicInteger原子类包装,能够保证数据是线程安全的。
* int类型转换为二进制之后的最高三位保存线程池的状态,低29位保存线程数量。刚初始化ctl的时候,rs为RUNNING状态,wc为0
* ReentrantReadWriteLock也是使用一个state变量保存写锁和读锁的获取信息,ConcurrentHashMap中的甚至使用一个lockState保存三种锁状态
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 线程数量掩码位数,int类型长度-3后的剩余位数,即wc所占位数为29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 为了能正确保存线程数量,线程池的数量线程被限制为29位的最大值,即最大(2^29)-1个,而不是(2^31)-1个
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//ThreadPoolExecutor中定义了线程池的状态,存储在ctl的高三位中,一共有五种
/**
* RUNNING状态,11100000000000000000000000000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* SHUTDOWN状态:00000000000000000000000000000000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* STOP状态:00100000000000000000000000000000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* TIDYING状态:01000000000000000000000000000000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* TERMINATED状态:01100000000000000000000000000000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//通过对ctl的拆解、组合,获取相关的数据
/**
* 获取ctl的高3位,线程池运行状态
*
* @param c 此时的ctl值
* @return ctl的高3位的int值
*/
private static int runStateOf(int c) {
//将c的低29位置为0并返回结果
return c & ~CAPACITY;
}
/**
* 获取ctl的低29位,线程数量
*
* @param c 此时的ctl值
* @return ctl的低29位的int值
*/
private static int workerCountOf(int c) {
//将c的高3位置为0并返回结果
return c & CAPACITY;
}
/**
* 组合rs和wc,计算ctl新值
*
* @param rs 运行状态
* @param wc 线程数量
* @return 新ctl的int值
*/
private static int ctlOf(int rs, int wc) {
//两者与运算
return rs | wc;
}
/**
* 线程任务队列,是一个阻塞队列
* 使用isEmpty来检测队列是否为空,而不是通过poll的返回值
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 当改变线程池状态,比如添加工作线程、停止线程池,或者访问线程池共享参数信息比如当前线程数量的时候
* 因为这涉及到多个线程之间的共享信息比如线程池状态、工作线程数量等参数的同步,需要获取mainLock独占锁才行
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 包含全部工作线程的set集合
* 只有在持有mainLock锁的时候才能访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 主要是为了支持awaitTermination方法,外部线程调用awaitTermination方法之后
* 会判断线程池是否是TERMINATED状态,即终止状态,如果不是则调用线程在termination条件变量中等待,直到超时或者线程完毕
*/
private final Condition termination = mainLock.newCondition();
/**
* 记录到目前为止线程池中的拥有的最大线程数量
* 只有在持有mainLock锁的时候才能访问
*/
private int largestPoolSize;
/**
* 线程池已完成的任务数量,只有在某个Worker工作线程终止时才会更新
* 只有在持有mainLock锁的时候才能访问
*/
private long completedTaskCount;
/**
* ThreadPoolExecutor中的工作线程统一使用线程工厂来创建
* threadFactory用于保存传入的线程工厂实现,具有volatile的特性
* Executors中给出了默认实现,我们可以直接使用:Executors.defaultThreadFactory()
*/
private volatile ThreadFactory threadFactory;
/**
* 对于提交任务数超过maxmumPoolSize+workQueue之和时超出的任务,或者线程池正在关闭时的新提交的任务执行的拒绝策略,
* 任务会交给RejectedExecutionHandler的handler来处理,具有volatile的特性
* ThreadPoolExecutor中提供了默认实现:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲的工作线程的等待超时时间,具有volatile的特性
* 当存在的工作线程数量大于指定核心线程数量时,那么多余的线程会使用此超时时间,超过该时间没有工作则关闭线程
* 或者如果允许CoreThreadTimeOut,那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心将永远等待新的任务。
*/
private volatile long keepAliveTime;
/**
* 是否允许核心线程应用空闲超时时间,具有volatile的特性
* 如果为false,那么即使核心线程空闲也会永远保持活动状态(不会被销毁)
* 如果为true,那么核心线程将会应用 keepAliveTime,在指定时间内等待工作,超时则被销毁(设置成功的要求是超时时间大于0)。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池核心线程数量,具有volatile的特性
* 除非设置了allowCoreThreadTimeOut=true,那么核心线程永远不会被销毁
*/
private volatile int corePoolSize;
/**
* 线程池最大线程数量,具有volatile的特性
* 不能超过(2^29)-1
*/
private volatile int maximumPoolSize;
//下面的属性涉及到Java安全模型:https://developer.ibm.com/zh/articles/j-lo-javasecurity/
//一般人接触不到
/**
* 用于校验是否具有shutdown 和 shutdownNow 关闭线程池的操作权限
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
* 授权上下文环境对象
* 在GC标记ThreadPoolExecutor对象并调用finalize方法时调用,用于释放资源
*/
private final AccessControlContext acc;
复制代码
2.1 ctl相关方法
围绕着ctl有一系列的简单的方法,它们被其他大量方法所调用,这里先介绍出来!
runStateOf(int c):获取ctl的高3位,即获取线程池运行状态值; workerCountOf(int c):获取ctl的低29位,即获取线程数量值; ctlOf(int rs, int wc):组合rs和wc,计算ctl新值; runStateLessThan(int c, int s):c的运行状态值是否小于指定状态值s; runStateAtLeast(int c, int s):c的运行状态值是否大于等于指定状态值s; isRunning(int c):c的运行状态是否是RUNNING; compareAndIncrementWorkerCount(int expect):尝试CAS的将ctl的WorkerCount线程数量部分自增1; compareAndDecrementWorkerCount(int expect):尝试CAS的将ctl的WorkerCount线程数量部分自减1; decrementWorkerCount():循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。只有在addWorkerFailed、processWorkerExit以及getTask方法中调用。
//通过对ctl的拆解、组合,获取相关的数据
/**
* 获取ctl的高3位,线程池运行状态
*
* @param c 此时的ctl值
* @return ctl的高3位的int值
*/
private static int runStateOf(int c) {
//将c的低29位置为0并返回结果
return c & ~CAPACITY;
}
/**
* 获取ctl的低29位,线程数量
*
* @param c 此时的ctl值
* @return ctl的低29位的int值
*/
private static int workerCountOf(int c) {
//将c的高3位置为0并返回结果
return c & CAPACITY;
}
/**
* 组合rs和wc,计算ctl新值
*
* @param rs 运行状态
* @param wc 线程数量
* @return 新ctl的int值
*/
private static int ctlOf(int rs, int wc) {
//两者与运算
return rs | wc;
}
//不需要拆解ctl进行数据比较或者更改
/**
* 运行状态值是否小于指定状态值
*
* @param c 此时的ctl
* @param s 指定状态值
* @return true 是 false 否
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/**
* 运行状态值是否大于等于指定状态值
*
* @param c 此时的ctl
* @param s 指定状态值
* @return true 是 false 否
*/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 运行状态是否是RUNNING
*
* @param c 此时的ctl
* @return true 是 false 否
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 尝试CAS的将ctl的WorkerCount线程数量部分自增1
*
* @param expect 预期的ctl值
* @return true 成功 false 失败
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 尝试CAS的将ctl的WorkerCount线程数量部分自减1
*
* @param expect 预期的ctl值
* @return true 成功 false 失败
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止
* 只有在addWorkerFailed、processWorkerExit以及getTask部分调用
*/
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
复制代码
2.2 线程池的状态
在上面的属性介绍中,我们知道线程池有5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
RUNNING = -1 << COUNT_BITS,转换为二进制就是11100000000000000000000000000000 SHUTDOWN = 0 << COUNT_BITS,转换为二进制就是00000000000000000000000000000000 STOP = 1 << COUNT_BITS,转换为二进制就是00100000000000000000000000000000 TIDYING = 2 << COUNT_BITS,转换为二进制就是01000000000000000000000000000000 TERMINATED = 3 << COUNT_BITS,转换为二进制就是01100000000000000000000000000000
可以发现,运行状态的大小关系为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,这在状态转换的时候非常有用,这样可以通过大小判断状态关系。
类似于线程的状态,线程池的状态也可以转换。但是又有不同,线程状态可以循环转换、相互转换,而一旦发生线程池的状态的转换,那么该转换不可逆。下面来看看线程池状态的转换规则:
详细说明:
状态名 | 说明 | 转换 |
RUNNING | 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 | 新建的线程池的初始状态就是RUNNING,并且线程池中的工作线程数量为0。 |
SHUTDOWN | 线程池处在SHUTDOWN状态时,不接收新任务,但内部正在执行的任务和队列里等待的任务,会执行完,随后会清理全部工作线程。 | RUNNING状态的线程池,调用shutdown方法,或者隐式调用了finalize方法(里面有shutdown方法时线程池状态将变成SHUTDOWN。 |
STOP | 线程池处在STOP状态时,不接收新任务,不处理已添加的任务(丢弃),并且会中断正在处理的任务,随后会清理全部工作线程。 | RUNNING or SHUTDOWN状态的线程池,调用shutdownNow方法,线程池状态将变成 STOP。 |
TIDYING | 所有的任务已执行完或被终止或被丢弃,ctl记录的workerCount工作线程数量为0,线程池会变为TIDYING状态。接着会执行钩子函数terminated()。 | SHUTDOWN状态的线程池,当任务队列为空并且线程池工作线程数workerCount为0时,线程池状态就会由 SHUTDOWN 自动转换为 TIDYING状态。 STOP状态的线程池,线程池中工作线程数workerCount为0时,线程池状态就会由STOP自动转换为TIDYING状态。 |
TERMINATED | 钩子函数terminated()执行完毕,就变成TERMINATED状态,线程池彻底终止。 | TIDYING状态的线程池,在接着执行完terminated()之后,线程池状态就会由TIDYING自动转换为 TERMINATED。 |
3 ThreadPoolExecutor的构造器
ThreadPoolExecutor的构造器是创建线程池的入口,JDK提供了四个构造函数。其中参数较少的的三个构造函数内部都是调用参数最多的那一个构造函数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue< Runnable > workQueue)
使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
//内部调用最多参数的构造器
//线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
//拒绝策略传递的默认实现:defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
复制代码
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,ThreadFactory threadFactory)
使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
//内部调用最多参数的构造器
//拒绝策略传递的默认实现:defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
复制代码
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,RejectedExecutionHandler handler)
使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
//内部调用最多参数的构造器
//线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
复制代码
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
最后一个构造器,使用给定的初始参数创建新的 ThreadPoolExecutor,一共有7个参数。
/**
1. 使用给定的初始参数创建新的 ThreadPoolExecutor。
2. 3. @param corePoolSize 核心线程数
3. @param maximumPoolSize 最大线程数
4. @param keepAliveTime 空闲线程等待超时时间
5. @param unit 超时时间单位
6. @param workQueue 阻塞任务队列
7. @param threadFactory 线程工厂
8. @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//一系列参数校验
/*
* 如果核心线程数小于0
* 或者 如果最大线程数小于等于0
* 或者 如果最大线程数小于核心线程数
* 或者 如果空闲线程等待超时时间小于0
*
* 满足上面一项,都将抛出IllegalArgumentException异常
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
/*
* 如果阻塞任务队列为null
* 或者 如果线程工厂为null
* 或者 如果拒绝策略为null
*
* 满足上面一项,都将抛出NullPointerException异常
*/
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//初始化用于安全管理器的上下文参数
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//初始化核心线程数
this.corePoolSize = corePoolSize;
//初始化最大线程数
this.maximumPoolSize = maximumPoolSize;
//初始化阻塞任务队列
this.workQueue = workQueue;
//初始化空闲线程等待超时时间
this.keepAliveTime = unit.toNanos(keepAliveTime);
//初始化线程工厂
this.threadFactory = threadFactory;
//初始化拒绝策略
this.handler = handler;
}
复制代码
单从构造器来说,实际上很简单,首先对参数进行校验,随后对一些全局属性进行初始化。下面我们主要来解释构造函数中的参数的含义,首先简单的认识这些属性:
- corePoolSize:线程池核心线程数。不能小于0。
- 当提交一个任务到线程池时,如果此时线程池的线程数量小于核心线程数,那么线程池会新创建一个线程来执行任务,即使此时存在空闲线程也不例外。默认情况创建0个核心线程,如果调用了线程池的prestartAllCoreThreads()方法,线程池会立即创建并启动所有核心线程。
- maximumPoolSize:线程池最大线程数。不能小于corePoolSize,不能小于等于0。
- 当workQueue(任务队列)放不下线程任务,并且已创建的线程数小于最大线程数,则线程池会再次创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列(任务队列没有上限大小)这个参数就没什么效果。
- keepAliveTime:空闲线程等待超时时间;unit:keepAliveTime时间单位
- 当线程数量超过corePoolSize时,多余的空闲线程等待超时时间,即如果指定时间返回没有任务执行,那么该线程将被回收,直到数量减少到corePoolSize为止。
- 如果允许CoreThreadTimeOut(前提是keepAliveTime大于0),那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心线程将永远等待新的任务。
- workQueue:阻塞任务队列。
- 当线程任务添加的速度超过所有核心线程执行速度时,新来的来不及执行的线程任务将被存放到workQueue阻塞任务队列中。
- 任务队列一定是阻塞队列,常见的有以下四种,实际上有很多种:
- ArrayBlockingQueue:有界阻塞任务队列,构造函数一定要传入具体队列容量。
- LinkedBlockingQueu:通常作为无界阻塞任务队列(构造函数不传大小会默认为Integer.MAX_VALUE ),当有大量任务提交时,容易造成内存耗尽。
- SynchronousQueue:一个没有容量的阻塞队列,会将任务同步交付给工作线程。
- PriorityBlockingQueue : 具有优先级的无界阻塞任务队列。
- threadFactor:线程工厂。
- 线程工厂用于创建工作线程,默认线程工厂:Executors.defaultThreadFactory。
- handler:拒绝策略。
- 对于正在执行的线程数等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
- ThreadPoolExecutor中提供了4种拒绝策略的实现:
- AbortPolicy:调用者的线程直接抛出异常,作为默认拒绝策略;
- CallerRunsPolicy:用调用者的线程执行任务;
- DiscardOldestPolicy:抛弃队列中最久的任务;
- DiscardPolicy:抛弃当前任务;
3.1. corePoolSize、workQueue,maximumPoolSize ,keepAliveTime,unit之间关系
- 默认情况下,新的线程池不会启动任何线程。线程数量小于corePoolSize时,新提交任务都将通过线程工厂创建一个新线程去执行,即使此时线程池中存在空闲的线程。
- 当创建的线程数量达到corePoolSize时,新提交的任务会判断如果有空闲的线程那么就让空闲线程去执行,没有空闲线程时新提交的任务将被放入workQueue中,当有线程执行完当前任务,就会从任务队列中拉取任务继续执行。
- 当workQueue已满,并且maximumPoolSize>corePoolSize时,此时新提交任务又会通过线程工厂创建新线程去执行。
- 对于线程数大于等于maxmumPoolSize以及workQueue容量已满时新提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
- 如果线程池中存在超过corePoolSize的线程数并且存在空闲线程。如果空闲线程在keepAliveTime(unit表示时间单位)时间范围内都没有工作时,将清理空闲线程,减少资源浪费,直到线程数量被清理减少至核心线程数为止,预留一定数量的核心资源。而通过调用allowCoreThreadTimeOut(true)方法(设置成功的要求是超时时间大于0),核心线程也将应用超时时间规则,即此时如果没有新任务,那么所有的线程都将可能被清理。
corePoolSize、maximumPoolSize、keepAliveTime+unit、ThreadFactory、RejectedExecutionHandler都可以在线程池启动(创建)之后动态调整!通过这些参数,线程池可以动态的调整线程数量,我们也可以创建属于自己的特殊的线程池。
unit表示keepAliveTime的时间单位,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
3.2 ThreadFactory 线程工厂
线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从new Thread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
也可以通过实现ThreadFactory自定义线程工厂,案例如下:
public class ThreadPool {
private static ExecutorService pool;
public static void main(String[] args) {
//自定义线程池,最大线程数为3
pool = new ThreadPoolExecutor(3, 3, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
//自定义线程工厂,lambda
r -> {
System.out.println("线程" + r.hashCode() + "创建");
//线程命名
return new Thread(r, "threadPool-" + r.hashCode());
}, new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
pool.execute(new ThreadTask());
}
pool.shutdown();
}
}
class ThreadTask implements Runnable {
@Override
public void run() {
//输出执行线程的名称
System.out.println("ThreadName:" + Thread.currentThread().getName());
}
}
复制代码
上面是通过匿名内部类的lambda形式创建ThreadFactory实例的,其他框架也提供了多种现成的方式:
- spring自带的的CustomizableThreadFactory。可以通过构造器和方法设置参数。
- guava的ThreadFactoryBuilder。可以通过方法的链式调用设置参数。
- commons-lang3的BasicThreadFactory。可以通过方法的链式调用设置参数
3.3 workQueue任务队列
任务队列用于存放提交的来不及执行的任务,一定是一个阻塞队列BlockingQueue,JDK自带的阻塞队列如下:
ArrayBlockingQueue | 有界阻塞队列 |
LinkedBlockingQueue | 无界阻塞队列 |
LinkedBlockingDeque | 无界阻塞双端队列 |
SynchronousQueue | 没有容量的阻塞队列 |
DelayQueue | 支持延时操作的无界阻塞队列 |
PriorityBlockingQueue | 任务具有优先级的无界阻塞队列 |
LinkedTransferQueue | JDK1.7的新无界阻塞队列 |
根据常用阻塞队列的类型,任务队列一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列,当然还有其他的类型。
3.3.1 直接提交队列
阻塞队列设置为SynchronousQueue。前面讲过,SynchronousQueue是一个特殊的BlockingQueue,它内部没有容量,每一个入队操作都需要匹配一个等待的出队操作或者等待被后来的出队操作匹配才能返回,出队操作同理。JUC—三万字的SynchronousQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用SynchronousQueue队列,新任务不会被保存在任务队列中,总是会马上被执行。如果此时线程数量小于maximumPoolSize,则尝试创建新的线程执行;如果达到maximumPoolSize设置的最大值,则传递给执行拒绝策略去执行。
采用SynchronousQueue作为任务队列的线程池不能缓存任务,一个任务要么被执行要么被拒绝策略处理,这就是“直接提交”的由来。
public class SynchronousQueueThreadPool {
public static void main(String[] args) {
//maximumPoolSize设置为2,拒绝策略为AbortPolicy策略,直接抛出异常
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
//采用SynchronousQueue作为任务队列,不能缓存任务
new SynchronousQueue<>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//核心线程应用超时等待机制
pool.allowCoreThreadTimeOut(true);
//提交三个任务,明显会超出最大线程数量
for (int i = 0; i < 3; i++) {
pool.execute(new SynchronousQueueThreadTask());
}
}
}
class SynchronousQueueThreadTask implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
复制代码
结果:
pool-1-thread-2 pool-1-thread-1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.thread.test.threadpool.ThreadTaskSyn@2f0e140b rejected from java.util.concurrent.ThreadPoolExecutor@7440e464[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.thread.test.threadpool.SynchronousQueueThreadPool.main(SynchronousQueueThreadPool.java:17)
3.3.2 有界任务队列
阻塞队列设置为ArrayBlockingQueue。前面讲过,ArrayBlockingQueue是一个特殊的BlockingQueue,它必须设置容量,作为有界任务队列。JUC—ArrayBlockingQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用ArrayBlockingQueue队列,则会将新的任务加入到等待队列中,如果达到队列的容量,则继续添加的任务会被处理。如果此时线程数量小于maximumPoolSize,则尝试创建新的线程执行;如果达到maximumPoolSize设置的最大值,则传递给执行拒绝策略去执行。
采用ArrayBlockingQueue作为任务队列的线程池可以缓存任务,是较为常见的任务队列。
3.3.3 无界任务队列
阻塞队列设置为LinkedBlockingQueue。前面讲过,LinkedBlockingQueue是一个特殊的BlockingQueue,它虽然可以设置容量,但是不设置容量就作为无界任务队列。JUC—LinkedBlockingQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用LinkedBlockingQueue队列,并且没有指定容量,那么可以无限制的保存添加的新任务(实际上最大容量为 Integer.MAX_VALUE,但是基本上达不到就会抛出内存溢出异常)。
采用LinkedBlockingQueue作为任务队列的线程池可以无限缓存任务,这时,设置maximumPoolSize参数是无效的,当线程池的线程数达到corePoolSize后就不会再增加了,此时需要注意内存资源耗尽的问题。
另外,由于LinkedBlockingQueue也可以设置容量,因此也可以作为有界任务队列,并且由于它采用了两把锁,性能好于采用一把锁的ArrayBlockingQueue。
3.3.4 优先任务队列
阻塞队列设置为PriorityBlockingQueue。前面讲过,PriorityBlockingQueue是一个特殊的BlockingQueue,它可以为任务设置优先级,优先最高的任务将会先出队列被执行,它要求任务可比较大小。JUC—两万字的PriorityBlockingQueue源码深度解析。
使用PriorityBlockingQueue队列,通常设置corePoolSize为0,这样新来的任务将会直接进入PriorityBlockingQueue,如果没有指定容量,那么可以无限制的保存添加的新任务(由于底层是数组,实际上最大容量为 Integer.MAX_VALUE,但是基本上达不到就会抛出内存溢出异常)。
采用PriorityBlockingQueue作为任务队列的线程池可以无限缓存任务,如果没有设置容量,maximumPoolSize参数是无效的,当线程池的线程数达到corePoolSize后就不会再增加了,此时需要注意内存资源耗尽的问题。
public class PriorityBlockingQueueThreadPool {
public static void main(String[] args) {
//一个线程池,使用优先级的PriorityBlockingQueue阻塞队列作为任务队列
ExecutorService pool = new ThreadPoolExecutor(0, 1, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
/*
* 循环从 priority=0开始添加任务,即最先添加的任务优先级最低
* 查看输出执行情况,可以发现最后添加的任务(优先级最高)最先被执行
*/
for (int i = 0; i < 20; i++) {
pool.execute(new ThreadTask(i));
}
pool.shutdown();
}
/**
* 任务实现,同时作为Comparable的实现类
*/
static class ThreadTask implements Runnable, Comparable<ThreadTask> {
private int priority;
ThreadTask(int priority) {
this.priority = priority;
}
/**
* 当前对象和其他对象做比较,当前priority大就返回-1,当前priority小就返回1,
*
* @param o 被比较的线程任务
* @return 返回-1就表示当前任务出队列优先级更高;返回1就表示当前任务出队列优先级更低;即priority值越大,出队列的优先级越高;
*/
@Override
public int compareTo(ThreadTask o) {
return this.priority > o.priority ? -1 : 1;
}
@Override
public void run() {
System.out.println("priority:" + this.priority + ",ThreadName:" + Thread.currentThread().getName());
}
}
}
复制代码
3.4 RejectedExecutionHandler拒绝策略
对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
JDK内置的拒绝策略实现有4种,都位于ThreadPoolExecutor内部作为内部类。需要注意的是拒绝策略是由调用execute或者submit方法的线程去执行的,而不是线程池的线程去执行。
3.4.1 AbortPolicy 抛出异常
调用线程直接抛出异常,阻止系统正常运行。线程池的默认策略。
/**
* 默认的拒绝策略,可以看到就是AbortPolicy策略
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/**
* 调用者抛出RejectedExecutionException异常的拒绝策略
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() {
}
/**
* 总是抛出RejectedExecutionException.
*
* @param r 任务
* @param e 线程池
* @throws RejectedExecutionException 异常
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
复制代码
3.4.2 CallerRunsPolicy调用者执行
只要线程池未关闭,该策略就直接在调用者线程中,运行当前被丢弃的任务,如果线程池关闭,任务将被静默丢弃。需要注意可能会阻塞main线程(调用线程)。
/**
* 只要线程池未关闭,该策略就直接在调用者线程中,运行当前被丢弃的任务,
* 如果线程池关闭(SHUTDOWN以及之后的状态),任务将被静默丢弃。
* 需要注意可能会阻塞main线程
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() {
}
/**
* 在调用方的线程中执行任务 r,除非执行器已关闭(SHUTDOWN以及之后的状态),在这种情况下,任务被丢弃。
*
* @param r 任务
* @param e 线程池
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//判断线程池是否没有被关闭
if (!e.isShutdown()) {
//直接通过调用线程运行run方法
r.run();
}
}
}
复制代码
案例:
/**
* @author lx
*/
public class CallerRunsPolicyTest {
public static void main(String[] args) {
// 创建线程池。和新线程和最大线程数都为1,
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
//采用直接提交任务队列,不能缓存任务
new SynchronousQueue<>(),
//拒绝策略为"CallerRunsPolicy"
new ThreadPoolExecutor.CallerRunsPolicy());
//新建10个任务,并将它们添加到线程池中。
//从结果可以看到,部分任务走了拒绝策略:由main线程(调用线程)运行,因此,需要一定要注意注意可能会阻塞main线程(调用线程)
for (int i = 0; i < 10; i++) {
Runnable myrun = new ThreadTask("task-" + i);
pool.execute(myrun);
}
System.out.println("------main线程执行到这里");
// 关闭线程池
pool.shutdown();
}
static class ThreadTask implements Runnable {
private String name;
public ThreadTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(this.name + " is running." + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
复制代码
某次结果:
task-1 is running.main
task-0 is running.pool-1-thread-1
task-2 is running.main
task-3 is running.pool-1-thread-1
task-4 is running.main
task-5 is running.pool-1-thread-1
task-6 is running.main
task-7 is running.pool-1-thread-1
task-8 is running.main
------main线程执行到这里
task-9 is running.pool-1-thread-1
复制代码
3.4.3 DiscardOldestPolicy丢弃最老任务
如果线程池被关闭,那么直接丢弃任务。否则丢弃队列中最老的一个任务,也就是即将被执行的一个任务,并尝试再次提交当前任务。这种拒绝策略也有可能阻塞调用线程(队列一直是满的)。
/**
* 如果线程池被关闭,那么直接丢弃任务。
* 否则丢弃队列中最老的一个任务,也就是即将被执行的一个任务,并尝试再次提交当前任务
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() {
}
/**
* 如果线程池被关闭,那么直接丢弃任务。
* 否则丢弃队列中最老的一个任务,也就是即将被执行的一个任务,并尝试再次提交当前任务
*
* @param r 任务
* @param e 线程池
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//判断线程池是否没有被关闭
if (!e.isShutdown()) {
//将队头任务移除队列丢弃
e.getQueue().poll();
//尝试再次提交任务
e.execute(r);
}
}
}
复制代码
3.4.4 DiscardPolicy丢弃任务
丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
/**
* 丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {
}
/**
* 丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
*
* @param r 任务
* @param e 线程池
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//什么也不做
}
}
复制代码
3.4.5 自定义拒绝策略
以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。这是一种设计模式—策略模式!
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
复制代码