why线程池
上一篇中有介绍了线程与进程的区别【线程与进程】;JDK中已经提供了Thread类和runnable接口来创建和启动一个线程了, 为什么还需线程池呢? 因为创建和销毁线程是需要有成本的,如果频繁的去做这个事情,那么会很浪费CPU的资源,进而影响之下的效率;极端情况下也可能引起OOM,导致系统挂掉。我们需要一个管理线程的池子来管理我们的任务,提高CPU的使用率同时又能够解决资源浪费的弊端。
ThreadPoolExecutor
在著名的concurrent包下有一个ThreadPoolExecutor类,可以看下里面的代码逻辑。我们看下这个类的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
参数说明:
1.corePoolSize:*corePoolSize the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set; 就是说核心线程数在一直保留在池子中,即使没有任务可执行; 如果设置了allowCoreThreadTimeOut,那么核心线程会超时一定时间滞后销毁掉;*
2.maximumPoolSize: the maximum number of threads to allow in the pool; 线程池中最大的线程数目。
3.keepAliveTime:when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating; 这是针对超过核心线程数的线程任务来说的, 如果超过一定时间,那么就会被回收。
4.BlockingQueue workQueue: workQueue the queue to use for holding tasks before they are
executed. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method 这个参数设置任务队列的类型. 常见的类型:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue
5.RejectedExecutionHandler:handler the handler to use when execution is blocked because the thread bounds and queue capacities are reached;拒绝策略,当任务数到最大的队列大小后,定义任务处理的策略;总共有5种拒绝策略。
AbortPolicy:默认策略,不执行任务,直接抛出异常。
DiscardPolicy:直接抛弃,不执行
DiscardOldestPolicy:抛弃最老的任务,再执行
CallerRunsPolicy:由调用者本身去启线程执行。
线程池工作流程
具体参见上图:
a.任务数量是否达到核心线程大小:如果没有,则执行任务,
b.任务数量达到了核心,则将任务push入队列等待执行。
c.队列已经满了,则新建线程执行,直到最大线程数
d.如果任务队列满了, 并且达到最大线程数,则执行拒绝策略。
常见线程池
如果我们不想自己去配置线程池参数,JDK帮我们定义了几个比较有特色的池子,我们可以直接拿来用。
CachedThreadPool: These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
优势:短生命周期的异步任务时,有优势,可以提高程序的性能;可以看到是用的SynchronousQueue。可缓存的池子,如果不存在则创建一个新的池子,如果60秒之后未有任务,则超时移除。
NewFixedThreadPool:这个会重复使用固定数量的线程 , 采用的是无边界队列,但是需要注意内存溢出的问题。
如果线程在执行过程中因为错误而中止,新线程会替代它的位置来执行后续的任务。
SingleThreadPool:只会使用单个工作线程来执行一个无边界的队列。如果线程遇到错误中止,它是无法使用替代线程的。
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
开发实践tips
1.对于任务负载比较高的应用,不合适用无界队列,会存在内存溢出的风险以及高的延迟; 所以最好将队列根据实际情况设置成有边界的队列。
2.根据场景使用合适的拒绝策略;以如果选用该策略,请注意异常的获取在execute任务的时候。
3.负载比较低的应用,可以直接用newFixTheadPool,大小可根据cpu核数确定。