一. 概述
- ThReadPoolExecutor作为java.util.concurrent包对外提供基础实现, 以内部线程池的形式对外提供管理任务执行, 线程调度, 线程池管理等等服务.
- Executors方法提供的线程服务, 都是通过参数设置来实现不同的线程池机制.
二. 核心构造方法讲解
下面是ThreadPoolExecutor最核心的构造方法
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null ||
threadFactory == null ||
handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法参数讲解
参数名 | 作用 |
---|---|
corePoolSize | 核心线程池大小 |
maximumPoolSize | 最大线程池大小 |
keepAliveTime | 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 |
TimeUnit | keepAliveTime时间单位 |
workQueue | 阻塞任务队列 |
threadFactory | 新建线程工厂 |
RejectedExecutionHandler | 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理 |
重点讲解
其中比较让人容易误解的是:corePoolSize,maximumPoolSize,WorkQueue之间关系
- 当线程池小于corePoolSize时, 新提交的任务将创建一个新线程执行任务, 即使此时线程池中存在空闲线程.
- 当线程池打到corePoolSize时, 新提交任务将被放入workQueue中, 等待线程池中任务调度执行
- 当workQueue已满, 且maximumPoolSize > corePoolSize时, 新提交的任务会创建新线程执行任务
- 当提交任务数超过maximumPoolSize时, 新提交任务由RejectedExecutionHandler处理.
- 当线程池中超过corePoolSize线程, 空闲时间达到keepAliveTime时, 关闭空闲线程.
- 当设置allowCoreThreadTimeOut(True)时, 线程池中corePoolSIze线程空闲时间达到keepAliveTime也将关闭
线程管理机制图
三. Executors提供的线程池配置方案
- 构造一个固定线程数目的线程池, 配置的corePoolSize和maximumPoolSize大小相同, 同时使用了一个无界LinkedBlockingQueue存放阻塞任务, 因此多余的任务将存在阻塞队列, 不会由RejectedExecutionHandler处理.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 构造一个缓冲功能的线程池, 配置corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE, keepAliveTime=60s, 以及一个无容量的阻塞队列SynchronizeQueue, 因此任务提交之后, 将会创建新的线程执行; 线程空闲超过60s将会销毁.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 构造一个只支持一个线程的线程池, 配置corePoolSize=maximumPoolSize=1, 无界阻塞队列LinkedBlockingQueue; 保证任务由一个线程串行执行
public static ExecutorService new SingleThreadExecutor(){
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
- 构造有定时功能的线程池, 配置corePoolSize, 无界延迟阻塞队列DelayedWorkQueue;maximumPoolSize=Integer.MAX_VALUE, 由于DelayedWorkQueue是无界队列, 所以这个值是没有意义的.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
四. 定制非阻塞线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolExecutor {
private ThreadPoolExecutor pool = null;
/**
* 线程池初始化方法
*
* corePoolSize 核心线程池大小----10
* maximumPoolSize 最大线程池大小----30
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit
* TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
* workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列
* threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
* rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
pool = new ThreadPoolExecutor(
10,
30,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(10),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
public void destory() {
if(pool != null) {
pool.shutdownNow();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录异常
// 报警处理等
System.out.println("error.............");
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for(int i=1; i<100; i++) {
System.out.println("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("running=====");
}
});
}
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
方法中建立一个核心线程数为30个,缓冲队列有10个的线程池。每个线程任务,执行时会先睡眠3秒,保证提交10任务时,线程数目被占用完,再提交30任务时,阻塞队列被占用完,,这样提交第41个任务是,会交给CustomRejectedExecutionHandler 异常处理类来处理。
注意:41以后提交的任务就不能正常处理了,因为,execute中提交到任务队列是用的offer方法,如上面代码,这个方法是非阻塞的,所以就会交给CustomRejectedExecutionHandler 来处理,所以对于大数据量的任务来说,这种线程池,如果不设置队列长度会OOM,设置队列长度,会有任务得不到处理,接下来我们构建一个阻塞的自定义线程池
五. 定制阻塞线程池
package com.tongbanjie.trade.test.commons;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolExecutor {
private ThreadPoolExecutor pool = null;
/**
* 线程池初始化方法
*
* corePoolSize 核心线程池大小----1
* maximumPoolSize 最大线程池大小----3
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit
* TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
* workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)====5容量的阻塞队列
* threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
* rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
* 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
* 任务会交给RejectedExecutionHandler来处理
*/
public void init() {
pool = new ThreadPoolExecutor(
1,
3,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(5),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
public void destory() {
if(pool != null) {
pool.shutdownNow();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 核心改造点,由blockingqueue的offer改成put阻塞方法
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for(int i=1; i<100; i++) {
System.out.println("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(">>>task is running=====");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
解释:当提交任务被拒绝时,进入拒绝机制,我们实现拒绝方法,把任务重新用阻塞提交方法put提交,实现阻塞提交任务功能,防止队列过大,OOM,提交被拒绝方法在下面
总结
- 用ThreadPoolExecutor自定义线程池, 看线程池的用途, 如果任务量不大, 可以用无界队列, 如果任务量非常大, 要用有界队列, 防止OOM
- 如果任务量很大, 还要求每个任务都处理成功, 要对提交的任务进行阻塞提交, 重写拒绝机制, 改为阻塞提交. 保证不抛弃一个任务
- 最大线程数一般设为2N+1最好,N是CPU核数
- 核心线程数, 看应用, 如果是任务, 一天跑一次, 设置为0, 合适, 因为跑完就停掉了, 如果是常用线程池, 看任务量, 是保留一个核心还是几个核心线程数
- 如果要获取任务执行结果, 用CompletionService, 但是注意, 获取任务的结果的要重新开一个线程获取, 如果在主线程获取, 就要等待任务都提交后才获取, 就会阻塞大量任务结果, 队列过大OOM, 所以最好异步开个线程获取结果.