Java 线程池:ThreadPoolExecutor
线程池
当需要频繁的创建多个线程进行耗时操作时,频繁创建和销毁线程对象性能较差,并且缺乏统一管理,可能会无限制创建线程,导致相互竞争资源导致死锁,因此使用线程池就可以创建多个线程并且进行管理,提交的任务会被线程池指派给其中的线程进行执行,通过线程池的统一调度和管理使得多线程的使用更简单高效。
线程池的优点
重用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销.
能有效控制线程池的最大并发数,避免大量的线程之间因互相抢占系统资源而导致的阻塞现象.
能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行等功能.
ThreadPoolExecutor
ThreadPoolExecutor的功能是启动指定数量的线程以及将任务添加到一个队列中,别且将任务分发给空闲的线程。ExecutorService的生命周期包括运行、关闭、终止三种状态,创建后便进入运行状态,调用了shutdown()方法后,便进入关闭状态,此时ExecutorService不再接受新任务,但还会执行已经提交了的任务,当已提交任务都执行完后就变为终止状态。
常用ThreadPoolExecutor构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
下面解释下一下构造器中各个参数的含义:
corePoolSize:核心线程数量;
线程池启动时默认为空,当加入任务时才会创建线程,prestartAllCoreThreads()或prestartCoreThread()方法可以在没有任务到来之前就创建corePoolSize个线程或者一个线程;
maximumPoolSize:最大线程数量;
当workQueue使用无界队列时,此参数无效,当有新任务提交时,会执行以下判断:
如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
keepAliveTime:非核心线程闲置时的超时时长;
当前线程池线程总数大于核心线程数时,闲置超过这个时长,非核心线程就会被回收.当调用allowCoreThreadTimeOut(boolean)时,keepAliveTime同样会作用于核心线程,直到线程池中的线程数为0。
unit:参数keepAliveTime的时间单位;
在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:保存等待执行的任务的阻塞队列;
当提交一个新的任务到线程池以后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有以下几种处理方式:
ArrayBlockingQueue,基于数组的队列,此队列采用FIFO原则进行排序。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量;
如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量, 但这样也会降低线程处理任务的吞吐量。
如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。
LinkedBlockingQueue,基于链表的队列,此队列采用FIFO原则进行排序。如果没有指定大小,则默认值是 Integer.MAX_VALUE,那么线程池中能够创建的最大线程数就是corePoolSize,并且忽略maximumPoolSize参数和handler策略;
SynchronousQueue,直接将任务提交给线程而不是加入队列,实际上此队列为空。每个插入操作必须等到另一个调用移除的操作;如果新任务来了线程池没有任何可用线程处理的话,则调用拒绝策略。如果把maximumPoolSize设置为(Integer.MAX_VALUE),加上SynchronousQueue队列,就等同于Executors.newCachedThreadPool();
PriorityBlockingQueue,具有优先级的队列,可以自定义优先级,默认是按自然排序,可能很多场合并不合适。
threadFactory:它是ThreadFactory类型的变量,用来创建新线程;
默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
handler:拒绝策略;
当线程池与workQueue队列都满了的情况下,对新加任务采取的策略,有如下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常;
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常;
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程);
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
有效使用线程池的准则
只要遵循几条简单的准则,线程池可以成为构建服务器应用程序的极其有效的方法:
不要对那些同步等待其它任务结果的任务排队。这可能会导致死锁,在那种死锁中,所有线程都被一些任务所占用,这些任务依次等待排队任务的结果,而这些任务又无法执行,因为所有的线程都处在忙碌状态。
在为时间可能很长的操作使用合用的线程时要小心。如果程序必须等待诸如 I/O 完成这样的某个资源,那么请指定最长的等待时间,以及随后是失效还是将任务重新排队以便稍后执行。这样做保证了:通过将某个线程释放给某个可能成功完成的任务,从而将最终取得某些进展。
理解任务。要有效地调整线程池大小,你需要理解正在排队的任务以及它们正在做什么。它们是 CPU 限制的(CPU-bound)吗?它们是 I/O 限制的(I/O-bound)吗?你的答案将影响您如何调整应用程序。如果您有不同的任务类,这些类有着截然不同的特征,那么为不同任务类设置多个工作队列可能会有意义,这样可以相应地调整每个池。
线程池的最佳大小
线程池的最佳大小取决于可用处理器的数目以及工作队列中的任务性质。
若在一个具有N个处理器的系统上只有一个工作队列,其中全部是计算性质的任务,在线程池具有N或N+1个线程时一般会获得最大的CPU利用率。
对于那些可能需要等待I/O完成的任务,需要线程池的大小超过可用处理器的数目,因为并不是所有线程都一直在工作。
线程池封装类
public final class ThreadPoolUtil {
private static final String TAG = ThreadPoolUtil.class.getSimpleName();
private ThreadPoolExecutor mThreadPool;
private volatile static ThreadPoolUtil threadPoolUtil;
/**
* 线程池单例创建方法
*/
public static ThreadPoolUtil getInstance() {
if (threadPoolUtil == null) {
synchronized (ThreadPoolUtil.class) {
if (threadPoolUtil == null) {
threadPoolUtil = new ThreadPoolUtil();
}
}
}
return threadPoolUtil;
}
/**
* 将构造方法访问修饰符设为私有,禁止任意实例化。
*/
private ThreadPoolUtil() {
//获取处理器核心数
int processorCount = Runtime.getRuntime().availableProcessors();
//线程池数采用两倍的核心数
int poolSize = processorCount * 2;
Log.i(TAG, "ThreadPoolUtil: processorCount: " + processorCount);
//指定线程池名称
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("thread-pool-%d").build();
//任务队列使用基于链表结构的队列容量20倍核心数
//拒绝策略使用默认的拒绝策略
mThreadPool = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(processorCount * 20), namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
/**
* 将线程池初始化,核心线程数量
*/
public void perpare() {
if (mThreadPool.isShutdown() && !mThreadPool.prestartCoreThread()) {
int startThread = mThreadPool.prestartAllCoreThreads();
Log.i(TAG, "perpare: startThread" + startThread);
}
}
/**
* 向线程池中添加任务方法
*/
public void addExecuteTask(Runnable task) {
if (task != null) {
mThreadPool.execute(task);
}
}
/**
* 判断是否是最后一个任务
*/
protected boolean isTaskEnd() {
if (mThreadPool.getActiveCount() == 0) {
return true;
} else {
return false;
}
}
/**
* 获取缓存大小
*/
public int getQueue() {
return mThreadPool.getQueue().size();
}
/**
* 获取线程池中的线程数目
*/
public int getPoolSize() {
return mThreadPool.getPoolSize();
}
/**
* 获取已完成的任务数
*/
public long getCompletedTaskCount() {
return mThreadPool.getCompletedTaskCount();
}
/**
* 关闭线程池,不在接受新的任务,会把已接受的任务执行玩
*/
public void shutdown() {
if (!mThreadPool.isShutdown()) {
mThreadPool.shutdown();
}
threadPoolUtil = null;
}
}
使用方式
//执行线程
ThreadPoolUtil.getInstance().addExecuteTask(thread);
//在子线程执行操作
ThreadPoolUtil.getInstance().addExecuteTask(() -> {
doSomeThing();
});
//关闭线程池
ThreadPoolUtil.getInstance().shutdown();
参考:
https://www.cnblogs.com/yaoxiaowen/p/6576898.html
https://juejin.im/entry/58fada5d570c350058d3aaad
https://www.cnblogs.com/dolphin0520/p/3932921.html