ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。
ThreadPoolTaskExecutor的配置类 ThreadPoolTaskConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync // 开启异步功能
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 线程池的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
threadPoolTaskExecutor.setCorePoolSize(8);
// 如果设置setAllowCoreThreadTimeOut=true(默认false)时,核心线程会超时关闭
// threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
// 阻塞队列 当核心线程数达到最大时,新的任务会放在队列中排队等待执行
threadPoolTaskExecutor.setQueueCapacity(124);
// 最大线程池数量,当线程数>=corePoolSize,且任务队列已满时,线程池会创建新的线程来处理任务
// 任务队列已满时,且当线程数=maxPoolSize,线程池会拒绝处理任务而抛出异常
threadPoolTaskExecutor.setMaxPoolSize(64);
// 当线程空闲时间达到KeepAliveSeconds时,线程会推出,知道线程数量=CorePoolSize
// 允许线程空闲时间为60s,当MaxPoolSize的线程在空闲时间到达的时候销毁
// 如果AllowCoreThreadTimeOut=true,则会直到线程数量=0
threadPoolTaskExecutor.setKeepAliveSeconds(30);
// Spring提供的 ThreadPoolTaskExecutor线程池,是有setThreadNamePrefix方法的
// JDK提供的ThreadPoolExecutor线程池没有setThreadNamePrefix方法
threadPoolTaskExecutor.setThreadNamePrefix("XXXX自定义线程池");
// 拒绝策略
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
// AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
// DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
// DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
查看 initialize()方法,它调用的是ExecutorConfigurationSupport 的initialize()方法
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isDebugEnabled()) {
logger.debug("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
initialize()方法调用了本类的initialzeExcutor()方法,但是这个类是一个抽象类,initialzeExcutor()方法也是一个抽象方法
/**
* Create the target {@link java.util.concurrent.ExecutorService} instance.
* Called by {@code afterPropertiesSet}.
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ExecutorService instance
* @see #afterPropertiesSet()
*/
protected abstract ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
ctrl+alt+b 查看它的实现类,它有四个实现类,其中一个就是ThreadPoolTaskExecutor,查看它的initialzeExcutor()方法,它其实内部使用了ThreadPoolExecutor。
/**
* Note: This method exposes an {@link ExecutorService} to its base class
* but stores the actual {@link ThreadPoolExecutor} handle internally.
* Do not override this method for replacing the executor, rather just for
* decorating its {@code ExecutorService} handle or storing custom state.
*/
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
所以说:ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理